Skip to content

Commit

Permalink
Merge pull request #1606 from skalenetwork/ticket-1602/SNB-refresh-no…
Browse files Browse the repository at this point in the history
…t-working

ticket-1602 Fixed SNB refresh not working each 1 hour
  • Loading branch information
DmytroNazarenko authored Sep 30, 2023
2 parents 6d81fa0 + 009fc97 commit 3ab88c0
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 52 deletions.
27 changes: 18 additions & 9 deletions agent/discoveryTools.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ export function initialSkaleNetworkScanForS2S() {
log.write( strLogPrefix +
cc.debug( "Downloading SKALE network information..." ) + "\n" );
}
if( log.verboseGet() >= log.verboseReversed().information ) {
log.write( strLogPrefix +
cc.notice( "Will init periodic S-Chains caching now..." ) + "\n" );
}
const opts = {
imaState: imaState,
"details": log,
Expand All @@ -64,18 +68,23 @@ export function initialSkaleNetworkScanForS2S() {
"bParallelModeRefreshSNB": ( !!( imaState.optsS2S.bParallelModeRefreshSNB ) ),
"isForceMultiAttemptsUntilSuccess": true
};
if( log.verboseGet() >= log.verboseReversed().information ) {
log.write( strLogPrefix +
cc.debug( "Will start periodic S-Chains caching..." ) + "\n" );
}
await skaleObserver.periodicCachingStart(
imaState.chainProperties.sc.strChainName,
opts
);
if( log.verboseGet() >= log.verboseReversed().information ) {
log.write( strLogPrefix +
cc.success( "Done, did started periodic S-Chains caching." ) + "\n" );
}
).then( function() {
if( log.verboseGet() >= log.verboseReversed().information ) {
log.write( strLogPrefix +
cc.success( "Done, did started periodic S-Chains caching." ) +
"\n" );
}
} ).catch( function( err ) {
if( log.verboseGet() >= log.verboseReversed().error ) {
const strError = owaspUtils.extractErrorMessage( err );
log.write( cc.fatal( "CRITICAL ERROR:" ) +
cc.error( " failed to start periodic S-Chains caching" ) +
cc.warning( strError ) + "\n" );
}
} );
return true;
}
} );
Expand Down
77 changes: 46 additions & 31 deletions agent/loop.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,20 @@ function constructChainProperties( opts ) {
};
}

function getDefaultOptsLoop( idxWorker ) {
const optsLoop = {
joRuntimeOpts: {
isInsideWorker: true, idxChainKnownForS2S: 0, cntChainsKnownForS2S: 0
},
isDelayFirstRun: false,
enableStepOracle: ( idxWorker == 0 ) ? true : false,
enableStepM2S: ( idxWorker == 0 ) ? true : false,
enableStepS2M: ( idxWorker == 1 ) ? true : false,
enableStepS2S: ( idxWorker == 0 ) ? true : false
};
return optsLoop;
}

export async function ensureHaveWorkers( opts ) {
if( gArrWorkers.length > 0 )
return gArrWorkers;
Expand All @@ -668,53 +682,54 @@ export async function ensureHaveWorkers( opts ) {
const workerData = {
url: "ima_loop_server" + idxWorker, cc: { isEnabled: cc.isEnabled() }
};
gArrWorkers.push(
new threadInfo.Worker(
path.join( __dirname, "loopWorker.mjs" ),
{ "type": "module", "workerData": workerData }
)
);
gArrWorkers.push( new threadInfo.Worker(
path.join( __dirname, "loopWorker.mjs" ),
{ "type": "module", "workerData": workerData }
) );
gArrWorkers[idxWorker].on( "message", jo => {
if( networkLayer.outOfWorkerAPIs.onMessage( gArrWorkers[idxWorker], jo ) )
return;
} );
gArrClients.push( new networkLayer.OutOfWorkerSocketClientPipe(
workerData.url, gArrWorkers[idxWorker] ) );
gArrClients[idxWorker].on( "message", async function( eventData ) {
const aClient = new networkLayer.OutOfWorkerSocketClientPipe(
workerData.url, gArrWorkers[idxWorker] );
gArrClients.push( aClient );
aClient.logicalInitComplete = false;
aClient.errorLogicalInit = null;
aClient.on( "message", async function( eventData ) {
const joMessage = eventData.message;
switch ( joMessage.method ) {
case "init":
if( ! joMessage.error ) {
aClient.logicalInitComplete = true;
break;
}
aClient.errorLogicalInit = joMessage.error;
if( log.verboseGet() >= log.verboseReversed().critical ) {
opts.details.write( cc.fatal( "CRITICAL ERROR:" ) + " " +
cc.debug( "Loop worker thread " ) + cc.info( idxWorker ) +
cc.debug( " reported/returned init error:" ) + " " +
cc.warning( owaspUtils.extractErrorMessage( joMessage.error ) ) + "\n" );
}
break;
case "log":
log.write( cc.attention( "LOOP WORKER" ) +
" " + cc.notice( workerData.url ) + " " + joMessage.message + "\n" );
break;
case "saveTransferError":
imaTransferErrorHandling.saveTransferError(
joMessage.message.category,
joMessage.message.textLog,
joMessage.message.ts );
joMessage.message.category, joMessage.message.textLog, joMessage.message.ts );
break;
case "saveTransferSuccess":
imaTransferErrorHandling.saveTransferSuccess( joMessage.message.category );
break;
} // switch ( joMessage.method )
} );
await threadInfo.sleep( 3 * 1000 );
const optsLoop = {
joRuntimeOpts: {
isInsideWorker: true, idxChainKnownForS2S: 0, cntChainsKnownForS2S: 0
},
isDelayFirstRun: false,
enableStepOracle: ( idxWorker == 0 ) ? true : false,
enableStepM2S: ( idxWorker == 0 ) ? true : false,
enableStepS2M: ( idxWorker == 1 ) ? true : false,
enableStepS2S: ( idxWorker == 0 ) ? true : false
};
const jo = {
"method": "init",
"message": {
"opts": {
"imaState": {
"optsLoop": optsLoop,
"optsLoop": getDefaultOptsLoop( idxWorker ),
"verbose_": log.verboseGet(),
"expose_details_": log.exposeDetailsGet(),
"arrSChainsCached": skaleObserver.getLastCachedSChains(),
Expand Down Expand Up @@ -791,18 +806,14 @@ export async function ensureHaveWorkers( opts ) {
"chainProperties": constructChainProperties( opts ),
"joAbiSkaleManager": opts.imaState.joAbiSkaleManager,
"bHaveSkaleManagerABI": opts.imaState.bHaveSkaleManagerABI,

"strChainNameOriginChain": opts.imaState.strChainNameOriginChain,

"isPWA": opts.imaState.isPWA,
"nTimeoutSecondsPWA": opts.imaState.nTimeoutSecondsPWA,

"strReimbursementChain": opts.imaState.strReimbursementChain,
"isShowReimbursementBalance": opts.imaState.isShowReimbursementBalance,
"nReimbursementRecharge": opts.imaState.nReimbursementRecharge,
"nReimbursementWithdraw": opts.imaState.nReimbursementWithdraw,
"nReimbursementRange": opts.imaState.nReimbursementRange,

"joSChainDiscovery": {
"isSilentReDiscovery":
opts.imaState.joSChainDiscovery.isSilentReDiscovery,
Expand All @@ -811,7 +822,6 @@ export async function ensureHaveWorkers( opts ) {
"periodicDiscoveryInterval":
opts.imaState.joSChainDiscovery.periodicDiscoveryInterval
},

"optsS2S": { // S-Chain to S-Chain transfer options
"isEnabled": true,
"bParallelModeRefreshSNB":
Expand All @@ -821,15 +831,20 @@ export async function ensureHaveWorkers( opts ) {
"secondsToWaitForSkaleNetworkDiscovered":
opts.imaState.optsS2S.secondsToWaitForSkaleNetworkDiscovered
},

"nJsonRpcPort": opts.imaState.nJsonRpcPort,
"isCrossImaBlsMode": opts.imaState.isCrossImaBlsMode
}
},
"cc": { "isEnabled": cc.isEnabled() }
}
};
gArrClients[idxWorker].send( jo );
while( ! aClient.logicalInitComplete ) {
if( log.verboseGet() >= log.verboseReversed().info )
log.write( "LOOP server is not inited yet...\n" );

await threadInfo.sleep( 1000 );
aClient.send( jo );
}
}
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "Loop module did created its " ) +
Expand Down
12 changes: 8 additions & 4 deletions agent/loopWorker.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,18 @@ class ObserverServer extends SocketServer {
constructor( acceptor ) {
super( acceptor );
const self = this;
self.initComplete = false;
cc.enable( workerData.cc.isEnabled );
self.opts = null;
self.intervalPeriodicSchainsCaching = null;
self.bIsPeriodicCachingStepInProgress = false;
self.mapApiHandlers.init = function( joMessage, joAnswer, eventData, socket ) {
joAnswer.message = {
"method": "" + joMessage.method,
"error": null
};
if( self.initComplete )
return joAnswer;
self.log = function() {
const args = Array.prototype.slice.call( arguments );
const jo = {
Expand Down Expand Up @@ -107,10 +114,6 @@ class ObserverServer extends SocketServer {
cc.debug( " will save cached S-Chains..." ) + "\n" );
}
skaleObserver.setLastCachedSChains( self.opts.imaState.arrSChainsCached );
joAnswer.message = {
"method": "" + joMessage.method,
"error": null
};
self.opts.imaState.chainProperties.mn.joAccount.address = owaspUtils.fnAddressImpl_;
self.opts.imaState.chainProperties.sc.joAccount.address = owaspUtils.fnAddressImpl_;
if( self.opts.imaState.chainProperties.mn.strURL &&
Expand Down Expand Up @@ -160,6 +163,7 @@ class ObserverServer extends SocketServer {
imaTx.getTransactionCustomizerForSChainTarget();
state.set( imaState );
imaCLI.initContracts();
self.initComplete = true;
if( log.verboseGet() >= log.verboseReversed().information ) {
self.log( cc.debug( "IMA loop worker" ) + " " + cc.notice( workerData.url ) +
cc.debug( " will do the following work:" ) + "\n" + " " +
Expand Down
28 changes: 24 additions & 4 deletions npms/skale-observer/observer.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,9 +1197,23 @@ export async function ensureHaveWorker( opts ) {
return;
} );
gClient = new networkLayer.OutOfWorkerSocketClientPipe( url, gWorker );
gClient.logicalInitComplete = false;
gClient.errorLogicalInit = null;
gClient.on( "message", function( eventData ) {
const joMessage = eventData.message;
switch ( joMessage.method ) {
case "init":
if( ! joMessage.error ) {
gClient.logicalInitComplete = true;
break;
}
gClient.errorLogicalInit = joMessage.error;
if( log.verboseGet() >= log.verboseReversed().critical ) {
opts.details.write( cc.fatal( "CRITICAL ERROR:" ) + " " +
cc.debug( "SNB worker thread reported/returned init error:" ) + " " +
cc.warning( owaspUtils.extractErrorMessage( joMessage.error ) ) + "\n" );
}
break;
case "periodicCachingDoNow":
if( log.verboseGet() >= log.verboseReversed().debug ) {
opts.details.write(
Expand All @@ -1222,7 +1236,6 @@ export async function ensureHaveWorker( opts ) {
break;
} // switch ( joMessage.method )
} );
await threadInfo.sleep( 1000 );
const jo = {
"method": "init",
"message": {
Expand Down Expand Up @@ -1318,7 +1331,13 @@ export async function ensureHaveWorker( opts ) {
}
}
};
gClient.send( jo );
while( ! gClient.logicalInitComplete ) {
if( log.verboseGet() >= log.verboseReversed().info )
log.write( "SNB server is not inited yet...\n" );

await threadInfo.sleep( 1000 );
gClient.send( jo );
}
}

async function inThreadPeriodicCachingStart( strChainNameConnectedTo, opts ) {
Expand Down Expand Up @@ -1350,6 +1369,9 @@ async function parallelPeriodicCachingStart( strChainNameConnectedTo, opts ) {
try {
const nSecondsToWaitParallel = ( opts.secondsToWaitForSkaleNetworkDiscovered > 0 )
? opts.secondsToWaitForSkaleNetworkDiscovered : ( 2 * 60 );
owaspUtils.ensureObserverOptionsInitialized( opts );
await ensureHaveWorker( opts );
await threadInfo.sleep( 5 * 1000 );
let iv = null;
iv = setTimeout( function() {
if( iv ) {
Expand All @@ -1368,8 +1390,6 @@ async function parallelPeriodicCachingStart( strChainNameConnectedTo, opts ) {
periodicCachingStop();
inThreadPeriodicCachingStart( strChainNameConnectedTo, opts );
}, nSecondsToWaitParallel * 1000 );
owaspUtils.ensureObserverOptionsInitialized( opts );
await ensureHaveWorker( opts );
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( threadInfo.threadDescription() +
cc.debug( " will inform worker thread to start periodic SNB refresh each " ) +
Expand Down
12 changes: 8 additions & 4 deletions npms/skale-observer/observerWorker.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,17 @@ class ObserverServer extends SocketServer {
constructor( acceptor ) {
super( acceptor );
const self = this;
self.initComplete = false;
self.opts = null;
self.intervalPeriodicSchainsCaching = null;
self.bIsPeriodicCachingStepInProgress = false;
self.mapApiHandlers.init = function( joMessage, joAnswer, eventData, socket ) {
joAnswer.message = {
"method": "" + joMessage.method,
"error": null
};
if( self.initComplete )
return joAnswer;
self.log = function() {
const args = Array.prototype.slice.call( arguments );
const jo = {
Expand All @@ -83,10 +90,6 @@ class ObserverServer extends SocketServer {
cc.enable( joMessage.message.cc.isEnabled );
log.verboseSet( self.opts.imaState.verbose_ );
log.exposeDetailsSet( self.opts.imaState.expose_details_ );
joAnswer.message = {
"method": "" + joMessage.method,
"error": null
};
self.opts.imaState.chainProperties.mn.joAccount.address =
owaspUtils.fnAddressImpl_;
self.opts.imaState.chainProperties.sc.joAccount.address =
Expand Down Expand Up @@ -149,6 +152,7 @@ class ObserverServer extends SocketServer {
self.opts.imaState.chainProperties.sc.joAbiIMA.message_proxy_chain_abi,
self.opts.imaState.chainProperties.sc.ethersProvider
);
self.initComplete = true;
if( log.verboseGet() >= log.verboseReversed().information ) {
self.log( cc.debug( "Full init compete for in-worker SNB server in " ) +
threadInfo.threadDescription() + " " +
Expand Down

0 comments on commit 3ab88c0

Please sign in to comment.