diff --git a/package-lock.json b/package-lock.json index 90f9944f5d..9af8e44640 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.2.2", + "version": "6.2.3", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.2.2", + "version": "6.2.3", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", @@ -4686,9 +4686,9 @@ } }, "node_modules/@openzeppelin/contracts": { - "version": "4.9.5", - "resolved": "https://registry.npmjs.org/@openzeppelin/contracts/-/contracts-4.9.5.tgz", - "integrity": "sha512-ZK+W5mVhRppff9BE6YdR8CC52C8zAvsVAiWhEtQ5+oNxFE6h1WdeWo+FJSF8KKvtxxVYZ7MTP/5KoVpAU3aSWg==" + "version": "4.9.6", + "resolved": "https://registry.npmjs.org/@openzeppelin/contracts/-/contracts-4.9.6.tgz", + "integrity": "sha512-xSmezSupL+y9VkHZJGDoCBpmnB2ogM13ccaYDWqJTfS3dbuHkgjuwDFUmaFauBCboQMGB/S5UqUl2y54X99BmA==" }, "node_modules/@polkadot/api": { "version": "9.14.2", @@ -10563,12 +10563,14 @@ } }, "node_modules/es5-ext": { - "version": "0.10.62", + "version": "0.10.64", + "resolved": "https://registry.npmjs.org/es5-ext/-/es5-ext-0.10.64.tgz", + "integrity": "sha512-p2snDhiLaXe6dahss1LddxqEm+SkuDvV8dnIQG0MWjyHpcMNfXKPE+/Cc0y+PhxJX3A4xGNeFCj5oc0BUh6deg==", "hasInstallScript": true, - "license": "ISC", "dependencies": { "es6-iterator": "^2.0.3", "es6-symbol": "^3.1.3", + "esniff": "^2.0.1", "next-tick": "^1.1.0" }, "engines": { @@ -11016,6 +11018,25 @@ "url": "https://opencollective.com/eslint" } }, + "node_modules/esniff": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/esniff/-/esniff-2.0.1.tgz", + "integrity": "sha512-kTUIGKQ/mDPFoJ0oVfcmyJn4iBDRptjNVIzwIFR7tqWXdVI9xfA2RMwY/gbSpJG3lkdWNEjLap/NqVHZiJsdfg==", + "dependencies": { + "d": "^1.0.1", + "es5-ext": "^0.10.62", + "event-emitter": "^0.3.5", + "type": "^2.7.2" + }, + "engines": { + "node": ">=0.10" + } + }, + "node_modules/esniff/node_modules/type": { + "version": "2.7.2", + "resolved": "https://registry.npmjs.org/type/-/type-2.7.2.tgz", + "integrity": "sha512-dzlvlNlt6AXU7EBSfpAscydQ7gXB+pPGsPnfJnZpiNJBDj7IaJzQlBZYGdEi4R9HmPdBv2XmWJ6YUtoTa7lmCw==" + }, "node_modules/espree": { "version": "9.6.1", "license": "BSD-2-Clause", @@ -11287,6 +11308,15 @@ "npm": ">=3" } }, + "node_modules/event-emitter": { + "version": "0.3.5", + "resolved": "https://registry.npmjs.org/event-emitter/-/event-emitter-0.3.5.tgz", + "integrity": "sha512-D9rRn9y7kLPnJ+hMq7S/nhvoKwwvVJahBi2BPmx3bvbsEdK3W9ii8cBSGjP+72/LnM4n6fo3+dkCX5FeTQruXA==", + "dependencies": { + "d": "1", + "es5-ext": "~0.10.14" + } + }, "node_modules/event-target-shim": { "version": "5.0.1", "license": "MIT", @@ -24837,9 +24867,9 @@ "optional": true }, "@openzeppelin/contracts": { - "version": "4.9.5", - "resolved": "https://registry.npmjs.org/@openzeppelin/contracts/-/contracts-4.9.5.tgz", - "integrity": "sha512-ZK+W5mVhRppff9BE6YdR8CC52C8zAvsVAiWhEtQ5+oNxFE6h1WdeWo+FJSF8KKvtxxVYZ7MTP/5KoVpAU3aSWg==" + "version": "4.9.6", + "resolved": "https://registry.npmjs.org/@openzeppelin/contracts/-/contracts-4.9.6.tgz", + "integrity": "sha512-xSmezSupL+y9VkHZJGDoCBpmnB2ogM13ccaYDWqJTfS3dbuHkgjuwDFUmaFauBCboQMGB/S5UqUl2y54X99BmA==" }, "@polkadot/api": { "version": "9.14.2", @@ -29049,10 +29079,13 @@ } }, "es5-ext": { - "version": "0.10.62", + "version": "0.10.64", + "resolved": "https://registry.npmjs.org/es5-ext/-/es5-ext-0.10.64.tgz", + "integrity": "sha512-p2snDhiLaXe6dahss1LddxqEm+SkuDvV8dnIQG0MWjyHpcMNfXKPE+/Cc0y+PhxJX3A4xGNeFCj5oc0BUh6deg==", "requires": { "es6-iterator": "^2.0.3", "es6-symbol": "^3.1.3", + "esniff": "^2.0.1", "next-tick": "^1.1.0" } }, @@ -29355,6 +29388,24 @@ "eslint-visitor-keys": { "version": "3.4.3" }, + "esniff": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/esniff/-/esniff-2.0.1.tgz", + "integrity": "sha512-kTUIGKQ/mDPFoJ0oVfcmyJn4iBDRptjNVIzwIFR7tqWXdVI9xfA2RMwY/gbSpJG3lkdWNEjLap/NqVHZiJsdfg==", + "requires": { + "d": "^1.0.1", + "es5-ext": "^0.10.62", + "event-emitter": "^0.3.5", + "type": "^2.7.2" + }, + "dependencies": { + "type": { + "version": "2.7.2", + "resolved": "https://registry.npmjs.org/type/-/type-2.7.2.tgz", + "integrity": "sha512-dzlvlNlt6AXU7EBSfpAscydQ7gXB+pPGsPnfJnZpiNJBDj7IaJzQlBZYGdEi4R9HmPdBv2XmWJ6YUtoTa7lmCw==" + } + } + }, "espree": { "version": "9.6.1", "requires": { @@ -29573,6 +29624,15 @@ "strip-hex-prefix": "1.0.0" } }, + "event-emitter": { + "version": "0.3.5", + "resolved": "https://registry.npmjs.org/event-emitter/-/event-emitter-0.3.5.tgz", + "integrity": "sha512-D9rRn9y7kLPnJ+hMq7S/nhvoKwwvVJahBi2BPmx3bvbsEdK3W9ii8cBSGjP+72/LnM4n6fo3+dkCX5FeTQruXA==", + "requires": { + "d": "1", + "es5-ext": "~0.10.14" + } + }, "event-target-shim": { "version": "5.0.1" }, diff --git a/package.json b/package.json index 99fc299236..73b34eff3b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.2.2", + "version": "6.2.3", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/commands/common/send-transaction-command.js b/src/commands/common/send-transaction-command.js new file mode 100644 index 0000000000..12694d731e --- /dev/null +++ b/src/commands/common/send-transaction-command.js @@ -0,0 +1,156 @@ +import Command from '../command.js'; +import { EXPECTED_TRANSACTION_ERRORS, OPERATION_ID_STATUS } from '../../constants/constants.js'; + +class SendTransactionCommand extends Command { + async sendTransactionAndHandleResult(transactionCompletePromise, data, command) { + const { + blockchain, + agreementId, + epoch, + operationId, + closestNode, + leftNeighborhoodEdge, + rightNeighborhoodEdge, + contract, + tokenId, + keyword, + hashFunctionId, + stateIndex, + txGasPrice, + } = data; + const sendTransactionOperationId = this.operationIdService.generateId(); + let txSuccess; + let msgBase; + try { + this.operationIdService.emitChangeEvent( + this.txStartStatus, + sendTransactionOperationId, + blockchain, + agreementId, + epoch, + operationId, + ); + txSuccess = await transactionCompletePromise; + } catch (error) { + this.logger.warn( + `Failed to execute ${command.name}, Error Message: ${error.message} for the Service Agreement ` + + `with the ID: ${agreementId}, Blockchain: ${blockchain}, Contract: ${contract}, ` + + `Token ID: ${tokenId}, Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, ` + + `Epoch: ${epoch}, State Index: ${stateIndex}, Operation ID: ${operationId}, ` + + `Closest Node: ${closestNode}, Left neighborhood edge: ${leftNeighborhoodEdge}, ` + + `Right neighborhood edge: ${rightNeighborhoodEdge}, ` + + `Retry number: ${this.commandRetryNumber - command.retries + 1}.`, + ); + this.operationIdService.emitChangeEvent( + OPERATION_ID_STATUS.FAILED, + sendTransactionOperationId, + blockchain, + error.message, + this.txErrorType, + ); + txSuccess = false; + if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.NODE_ALREADY_SUBMITTED_COMMIT)) { + msgBase = 'Node has already submitted commit. Finishing'; + } else if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.NODE_ALREADY_REWARDED)) { + msgBase = 'Node already rewarded. Finishing'; + } else if ( + error.message.includes(EXPECTED_TRANSACTION_ERRORS.SERVICE_AGREEMENT_DOESNT_EXIST) + ) { + msgBase = 'Service agreement doesnt exist. Finishing'; + } else if ( + error.message.includes( + EXPECTED_TRANSACTION_ERRORS.INVALID_PROXIMITY_SCORE_FUNCTIONS_PAIR_ID, + ) + ) { + msgBase = 'Invalid proximity score functions pair id. Finishing'; + } else if ( + error.message.includes(EXPECTED_TRANSACTION_ERRORS.INVALID_SCORE_FUNCTION_ID) + ) { + msgBase = 'Invalid score function id. Finishing'; + } else if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.COMMIT_WINDOW_CLOSED)) { + msgBase = 'Commit window closed. Finishing'; + } else if ( + error.message.includes(EXPECTED_TRANSACTION_ERRORS.NODE_NOT_IN_SHARDING_TABLE) + ) { + msgBase = 'Node not in sharding table. Finishing'; + } else if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.PROOF_WINDOW_CLOSED)) { + msgBase = 'Proof window closed. Finishing'; + } else if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.NODE_NOT_AWARDED)) { + msgBase = 'Node not awarded. Finishing'; + } else if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.WRONG_MERKLE_PROOF)) { + msgBase = 'Wrong merkle proof. Finishing'; + } else if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.INSUFFICIENT_FUNDS)) { + msgBase = 'Insufficient funds. Finishing'; + if (this.insufficientFundsErrorReceived) { + await this.insufficientFundsErrorReceived(command.data); + } + } else { + let newGasPrice; + if ( + error.message.includes(EXPECTED_TRANSACTION_ERRORS.TIMEOUT_EXCEEDED) || + error.message.includes(EXPECTED_TRANSACTION_ERRORS.TOO_LOW_PRIORITY) + ) { + newGasPrice = Math.ceil(txGasPrice * this.txGasIncreaseFactor); + } else { + newGasPrice = null; + } + + Object.assign(command, { + data: { ...command.data, gasPrice: newGasPrice }, + message: error.message, + }); + + return Command.retry(); + } + } + + if (txSuccess) { + this.operationIdService.emitChangeEvent( + this.txEndStatus, + sendTransactionOperationId, + blockchain, + agreementId, + epoch, + operationId, + ); + msgBase = 'Successfully executed'; + + this.operationIdService.emitChangeEvent( + this.operationEndStatus, + operationId, + blockchain, + agreementId, + epoch, + ); + } + + this.logger.trace( + `${msgBase} ${command.name} for the Service Agreement with the ID: ${agreementId}, ` + + `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + + `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + + `State Index: ${stateIndex}, Operation ID: ${operationId}, ` + + `Closest Node: ${closestNode}, Left neighborhood edge: ${leftNeighborhoodEdge}, ` + + `Right neighborhood edge: ${rightNeighborhoodEdge}, ` + + `Retry number: ${this.commandRetryNumber - command.retries + 1}`, + ); + + return Command.empty(); + } + + /** + * Builds default sendTransactionCommand + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'sendTransactionCommand', + delay: 0, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +export default SendTransactionCommand; diff --git a/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js b/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js index 2807d0da0a..8cc0e6a5e5 100644 --- a/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js +++ b/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js @@ -8,6 +8,7 @@ import { ERROR_TYPE, TRIPLE_STORE_REPOSITORIES, SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS, + SERVICE_AGREEMENT_SOURCES, } from '../../../../constants/constants.js'; class BlockchainEpochCheckCommand extends Command { @@ -119,6 +120,8 @@ class BlockchainEpochCheckCommand extends Command { blockchain, ); + if (peerRecord == null) return; + const ask = this.blockchainModuleManager.convertToWei(blockchain, peerRecord.ask); const timestamp = await this.blockchainModuleManager.getBlockchainTimestamp(blockchain); @@ -127,7 +130,7 @@ class BlockchainEpochCheckCommand extends Command { timestamp, blockchain, commitWindowDurationPerc, - SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS, + SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS[process.env.NODE_ENV], ); this.logger.info( `Epoch check: Found ${eligibleAgreementForSubmitCommit.length} eligible agreements for submit commit for blockchain: ${blockchain}`, @@ -135,40 +138,77 @@ class BlockchainEpochCheckCommand extends Command { const scheduleSubmitCommitCommands = []; const updateServiceAgreementsLastCommitEpoch = []; for (const serviceAgreement of eligibleAgreementForSubmitCommit) { - if (scheduleSubmitCommitCommands.length >= maxTransactions) { - this.logger.warn( - `Epoch check: not scheduling new commits. Submit commit command length: ${scheduleSubmitCommitCommands.length}, max number of transactions: ${maxTransactions} for blockchain: ${blockchain}`, - ); - break; - } + try { + if (scheduleSubmitCommitCommands.length >= maxTransactions) { + this.logger.warn( + `Epoch check: not scheduling new commits. Submit commit command length: ${scheduleSubmitCommitCommands.length}, max number of transactions: ${maxTransactions} for blockchain: ${blockchain}`, + ); + break; + } - const neighbourhood = await this.shardingTableService.findNeighbourhood( - blockchain, - serviceAgreement.keyword, - r2, - serviceAgreement.hashFunctionId, - serviceAgreement.scoreFunctionId, - ); + if (serviceAgreement.scoreFunctionId === 0) { + // corrupted service agreement data fetch new and store + const blockchainAgreementData = + await this.blockchainModuleManager.getAgreementData( + blockchain, + serviceAgreement.agreementId, + ); + if (!blockchainAgreementData) { + this.logger.warn( + `Epoch check: Unable to fetch agreement data for agreement id: ${serviceAgreement.agreementId}, blockchain id: ${blockchain}. Agreement will be retried in next epoch check command.`, + ); + continue; + } + await this.repositoryModuleManager.updateServiceAgreementRecord( + blockchain, + serviceAgreement.assetStorageContractAddress, + serviceAgreement.tokenId, + serviceAgreement.agreementId, + blockchainAgreementData.startTime, + serviceAgreement.epochsNumber, + serviceAgreement.epochLength, + blockchainAgreementData.scoreFunctionId, + blockchainAgreementData.proofWindowOffsetPerc, + serviceAgreement.hashFunctionId, + serviceAgreement.keyword, + serviceAgreement.assertionId, + serviceAgreement.stateIndex, + SERVICE_AGREEMENT_SOURCES.BLOCKCHAIN, + serviceAgreement.lastCommitEpoch, + serviceAgreement.lastProofEpoch, + ); + serviceAgreement.startTime = blockchainAgreementData.startTime; + serviceAgreement.scoreFunctionId = blockchainAgreementData.scoreFunctionId; + serviceAgreement.proofWindowOffsetPerc = + blockchainAgreementData.proofWindowOffsetPerc; + } - let neighbourhoodEdges = null; - if (serviceAgreement.scoreFunctionId === 2) { - neighbourhoodEdges = await this.shardingTableService.getNeighboorhoodEdgeNodes( - neighbourhood, + const neighbourhood = await this.shardingTableService.findNeighbourhood( blockchain, + serviceAgreement.keyword, + r2, serviceAgreement.hashFunctionId, serviceAgreement.scoreFunctionId, - serviceAgreement.keyword, ); - } - if (!neighbourhoodEdges && serviceAgreement.scoreFunctionId === 2) { - this.logger.warn( - `Epoch check: unable to find neighbourhood edges for agreement id: ${serviceAgreement.agreementId} for blockchain: ${blockchain}`, - ); - continue; - } + let neighbourhoodEdges = null; + if (serviceAgreement.scoreFunctionId === 2) { + neighbourhoodEdges = await this.shardingTableService.getNeighboorhoodEdgeNodes( + neighbourhood, + blockchain, + serviceAgreement.hashFunctionId, + serviceAgreement.scoreFunctionId, + serviceAgreement.keyword, + ); + } + + if (!neighbourhoodEdges && serviceAgreement.scoreFunctionId === 2) { + this.logger.warn( + `Epoch check: unable to find neighbourhood edges for agreement id: ${serviceAgreement.agreementId} for blockchain: ${blockchain}`, + ); + continue; + } - try { const rank = await this.serviceAgreementService.calculateRank( blockchain, serviceAgreement.keyword, @@ -223,6 +263,12 @@ class BlockchainEpochCheckCommand extends Command { blockchain, serviceAgreement.agreementId, ); + if (!agreementData) { + this.logger.warn( + `Unable to fetch agreement data in blockchain epoch check command for agreement id: ${serviceAgreement.agreementId}. Skipping scheduling submit commit command for blockchain: ${blockchain}`, + ); + continue; + } const blockchainAssertionSize = await this.blockchainModuleManager.getAssertionSize( blockchain, diff --git a/src/commands/protocols/common/epoch-check/epoch-check-command.js b/src/commands/protocols/common/epoch-check/epoch-check-command.js index f839a22090..ef9cb82087 100644 --- a/src/commands/protocols/common/epoch-check/epoch-check-command.js +++ b/src/commands/protocols/common/epoch-check/epoch-check-command.js @@ -32,6 +32,7 @@ class EpochCheckCommand extends Command { blockchain, operationId, }; + return this.commandExecutor.add({ name: 'blockchainEpochCheckCommand', data: commandData, diff --git a/src/commands/protocols/common/handle-protocol-message-command.js b/src/commands/protocols/common/handle-protocol-message-command.js index 223a5e472d..4477362614 100644 --- a/src/commands/protocols/common/handle-protocol-message-command.js +++ b/src/commands/protocols/common/handle-protocol-message-command.js @@ -96,7 +96,7 @@ class HandleProtocolMessageCommand extends Command { assertionId, operationId, ) { - const geAgreementData = async () => { + const getAgreementData = async () => { const agreementId = this.serviceAgreementService.generateId( blockchain, contract, @@ -130,12 +130,22 @@ class HandleProtocolMessageCommand extends Command { const [{ agreementId, agreementData }, blockchainAssertionSize, r0, ask] = await Promise.all([ - geAgreementData(), + getAgreementData(), this.blockchainModuleManager.getAssertionSize(blockchain, assertionId), this.blockchainModuleManager.getR0(blockchain), getAsk(), ]); const blockchainAssertionSizeInKb = blockchainAssertionSize / BYTES_IN_KILOBYTE; + if (!agreementData) { + this.logger.warn( + `Unable to fetch agreement data in handle protocol messsage command for agreement id: ${agreementId}, blockchain id: ${blockchain}`, + ); + return { + errorMessage: 'Unable to fetch agreement data.', + agreementId, + agreementData, + }; + } if (blockchainAssertionSizeInKb > this.config.maximumAssertionSizeInKb) { this.logger.warn( `The size of the received assertion exceeds the maximum limit allowed.. Maximum allowed assertion size in kb: ${this.config.maximumAssertionSizeInKb}, assertion size read from blockchain in kb: ${blockchainAssertionSizeInKb}`, diff --git a/src/commands/protocols/common/submit-commit-command.js b/src/commands/protocols/common/submit-commit-command.js index 899be2e4f2..71458ae2f0 100644 --- a/src/commands/protocols/common/submit-commit-command.js +++ b/src/commands/protocols/common/submit-commit-command.js @@ -5,15 +5,24 @@ import { COMMAND_TX_GAS_INCREASE_FACTORS, } from '../../../constants/constants.js'; import Command from '../../command.js'; +import SendTransactionCommand from '../../common/send-transaction-command.js'; -class SubmitCommitCommand extends Command { +class SubmitCommitCommand extends SendTransactionCommand { constructor(ctx) { super(ctx); this.commandExecutor = ctx.commandExecutor; this.blockchainModuleManager = ctx.blockchainModuleManager; this.operationIdService = ctx.operationIdService; + this.repositoryModuleManager = ctx.repositoryModuleManager; this.errorType = ERROR_TYPE.COMMIT_PROOF.SUBMIT_COMMIT_ERROR; + + this.txStartStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_COMMIT_SEND_TX_START; + this.txEndStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_COMMIT_SEND_TX_END; + this.txErrorType = ERROR_TYPE.COMMIT_PROOF.SUBMIT_COMMIT_SEND_TX_ERROR; + this.txGasIncreaseFactor = COMMAND_TX_GAS_INCREASE_FACTORS.SUBMIT_COMMIT; + this.operationEndStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_COMMIT_END; + this.commandRetryNumber = COMMAND_RETRIES.SUBMIT_COMMIT; } async execute(command) { @@ -109,107 +118,40 @@ class SubmitCommitCommand extends Command { stateIndex, (result) => { if (result?.error) { - if (result.error.message.includes('NodeAlreadySubmittedCommit')) { - resolve(false); - } else { - reject(result.error); - } + reject(result.error); } - resolve(true); }, txGasPrice, ); }); - const sendSubmitCommitTransactionOperationId = this.operationIdService.generateId(); - let txSuccess; - try { - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_COMMIT_SEND_TX_START, - sendSubmitCommitTransactionOperationId, + return this.sendTransactionAndHandleResult( + transactionCompletePromise, + { blockchain, agreementId, epoch, operationId, - ); - txSuccess = await transactionCompletePromise; - } catch (error) { - this.logger.warn( - `Failed to execute ${command.name}, Error Message: ${error.message} for the Service Agreement ` + - `with the ID: ${agreementId}, Blockchain: ${blockchain}, Contract: ${contract}, ` + - `Token ID: ${tokenId}, Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, ` + - `Epoch: ${epoch}, State Index: ${stateIndex}, Operation ID: ${operationId}, ` + - `Closest Node: ${closestNode}, Left neighborhood edge: ${leftNeighborhoodEdge}, ` + - `Right neighborhood edge: ${rightNeighborhoodEdge}, ` + - `Retry number: ${COMMAND_RETRIES.SUBMIT_COMMIT - command.retries + 1}.`, - ); - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.FAILED, - sendSubmitCommitTransactionOperationId, - blockchain, - error.message, - ERROR_TYPE.COMMIT_PROOF.SUBMIT_COMMIT_SEND_TX_ERROR, - ); - let newGasPrice; - if ( - error.message.includes(`timeout exceeded`) || - error.message.includes(`Pool(TooLowPriority`) - ) { - newGasPrice = Math.ceil(txGasPrice * COMMAND_TX_GAS_INCREASE_FACTORS.SUBMIT_COMMIT); - } else { - newGasPrice = null; - } - - Object.assign(command, { - data: { ...command.data, gasPrice: newGasPrice }, - message: error.message, - }); - - return Command.retry(); - } - - let msgBase; - if (txSuccess) { - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_COMMIT_SEND_TX_END, - sendSubmitCommitTransactionOperationId, - blockchain, - agreementId, - epoch, - operationId, - ); - msgBase = 'Successfully executed'; - - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_COMMIT_END, - operationId, - blockchain, - agreementId, - epoch, - ); - } else { - msgBase = 'Node has already submitted commit. Finishing'; - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.FAILED, - sendSubmitCommitTransactionOperationId, - blockchain, - msgBase, - ERROR_TYPE.COMMIT_PROOF.SUBMIT_COMMIT_SEND_TX_ERROR, - ); - } - - this.logger.trace( - `${msgBase} ${command.name} for the Service Agreement with the ID: ${agreementId}, ` + - `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + - `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + - `State Index: ${stateIndex}, Operation ID: ${operationId}, ` + - `Closest Node: ${closestNode}, Left neighborhood edge: ${leftNeighborhoodEdge}, ` + - `Right neighborhood edge: ${rightNeighborhoodEdge}, ` + - `Retry number: ${COMMAND_RETRIES.SUBMIT_COMMIT - command.retries + 1}`, + closestNode, + leftNeighborhoodEdge, + rightNeighborhoodEdge, + contract, + tokenId, + keyword, + hashFunctionId, + stateIndex, + txGasPrice, + }, + command, ); + } - return Command.empty(); + async insufficientFundsErrorReceived(commandData) { + await this.repositoryModuleManager.updateServiceAgreementLastCommitEpoch( + commandData.agreementId, + commandData.epoch - 1 < 0 ? null : commandData.epoch - 1, + ); } async commitAlreadySubmitted(blockchain, agreementId, epoch, stateIndex) { diff --git a/src/commands/protocols/common/submit-proofs-command.js b/src/commands/protocols/common/submit-proofs-command.js index 2f77239ff4..57b4b7d028 100644 --- a/src/commands/protocols/common/submit-proofs-command.js +++ b/src/commands/protocols/common/submit-proofs-command.js @@ -6,8 +6,9 @@ import { TRIPLE_STORE_REPOSITORIES, } from '../../../constants/constants.js'; import Command from '../../command.js'; +import SendTransactionCommand from '../../common/send-transaction-command.js'; -class SubmitProofsCommand extends Command { +class SubmitProofsCommand extends SendTransactionCommand { constructor(ctx) { super(ctx); @@ -19,6 +20,13 @@ class SubmitProofsCommand extends Command { this.repositoryModuleManager = ctx.repositoryModuleManager; this.errorType = ERROR_TYPE.COMMIT_PROOF.SUBMIT_PROOFS_ERROR; + + this.txStartStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_PROOFS_SEND_TX_START; + this.txEndStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_PROOFS_SEND_TX_END; + this.txErrorType = ERROR_TYPE.COMMIT_PROOF.SUBMIT_PROOFS_SEND_TX_ERROR; + this.txGasIncreaseFactor = COMMAND_TX_GAS_INCREASE_FACTORS.SUBMIT_PROOFS; + this.operationEndStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_PROOFS_END; + this.commandRetryNumber = COMMAND_RETRIES.SUBMIT_PROOFS; } async execute(command) { @@ -149,11 +157,7 @@ class SubmitProofsCommand extends Command { stateIndex, (result) => { if (result?.error) { - if (result.error.message.includes('NodeAlreadyRewarded')) { - resolve(false); - } else { - reject(result.error); - } + reject(result.error); } resolve(true); @@ -161,88 +165,22 @@ class SubmitProofsCommand extends Command { txGasPrice, ); }); - const sendSubmitProofsTransactionOperationId = this.operationIdService.generateId(); - let txSuccess; - try { - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_PROOFS_SEND_TX_START, - sendSubmitProofsTransactionOperationId, - blockchain, - agreementId, - epoch, - ); - txSuccess = await transactionCompletePromise; - } catch (error) { - this.logger.warn( - `Failed to execute ${command.name}, Error Message: ${error.message} for the Service Agreement ` + - `with the ID: ${agreementId}, Blockchain: ${blockchain}, Contract: ${contract}, ` + - `Token ID: ${tokenId}, Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, ` + - `Epoch: ${epoch}, State Index: ${stateIndex}, Operation ID: ${operationId}, ` + - `Retry number: ${COMMAND_RETRIES.SUBMIT_PROOFS - command.retries + 1}.`, - ); - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.FAILED, - sendSubmitProofsTransactionOperationId, - blockchain, - error.message, - ERROR_TYPE.COMMIT_PROOF.SUBMIT_PROOFS_SEND_TX_ERROR, - ); - let newGasPrice; - if ( - error.message.includes(`timeout exceeded`) || - error.message.includes(`Pool(TooLowPriority`) - ) { - newGasPrice = Math.ceil(txGasPrice * COMMAND_TX_GAS_INCREASE_FACTORS.SUBMIT_PROOFS); - } else { - newGasPrice = null; - } - - Object.assign(command, { - data: { ...command.data, gasPrice: newGasPrice }, - message: error.message, - }); - - return Command.retry(); - } - - let msgBase; - if (txSuccess) { - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_PROOFS_SEND_TX_START, - sendSubmitProofsTransactionOperationId, + return this.sendTransactionAndHandleResult( + transactionCompletePromise, + { blockchain, agreementId, epoch, - ); - msgBase = 'Successfully executed'; - - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_PROOFS_END, operationId, - blockchain, - agreementId, - epoch, - ); - } else { - msgBase = 'Node has already sent proof. Finishing'; - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.FAILED, - sendSubmitProofsTransactionOperationId, - blockchain, - msgBase, - ERROR_TYPE.COMMIT_PROOF.SUBMIT_COMMIT_SEND_TX_ERROR, - ); - } - - this.logger.trace( - `${msgBase} ${command.name} for the Service Agreement with the ID: ${agreementId}, ` + - `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + - `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + - `State Index: ${stateIndex}, Operation ID: ${operationId}, ` + - `Retry number: ${COMMAND_RETRIES.SUBMIT_PROOFS - command.retries + 1}`, + contract, + tokenId, + keyword, + hashFunctionId, + stateIndex, + txGasPrice, + }, + command, ); - - return Command.empty(); } async proofAlreadySubmitted(blockchain, agreementId, epoch, stateIndex) { @@ -263,6 +201,13 @@ class SubmitProofsCommand extends Command { return false; } + async insufficientFundsErrorReceived(commandData) { + await this.repositoryModuleManager.updateServiceAgreementLastProofEpoch( + commandData.agreementId, + commandData.epoch - 1 < 0 ? null : commandData.epoch - 1, + ); + } + async retryFinished(command) { const { blockchain, operationId } = command.data; await this.handleError( diff --git a/src/commands/protocols/get/sender/get-assertion-id-command.js b/src/commands/protocols/get/sender/get-assertion-id-command.js index f21a985688..beea10c2ce 100644 --- a/src/commands/protocols/get/sender/get-assertion-id-command.js +++ b/src/commands/protocols/get/sender/get-assertion-id-command.js @@ -154,13 +154,20 @@ class GetAssertionIdCommand extends Command { let agreementData; agreementData = await this.repositoryModuleManager.getServiceAgreementRecord(agreementId); - if (!agreementData) { + if (!agreementData || agreementData.scoreFunctionId === 0) { agreementData = await this.blockchainModuleManager.getAgreementData( blockchain, agreementId, ); } + if (!agreementData || agreementData.scoreFunctionId === 0) { + this.logger.warn( + `Unable to fetch agreement data in get assertion id command ${agreementId}, blockchain id: ${blockchain}`, + ); + throw Error(`Unable to get agreement data`); + } + const epoch = await this.serviceAgreementService.calculateCurrentEpoch( agreementData.startTime, agreementData.epochLength, diff --git a/src/commands/protocols/publish/sender/publish-schedule-messages-command.js b/src/commands/protocols/publish/sender/publish-schedule-messages-command.js index 1013099ed7..ff0f7ebdb0 100644 --- a/src/commands/protocols/publish/sender/publish-schedule-messages-command.js +++ b/src/commands/protocols/publish/sender/publish-schedule-messages-command.js @@ -76,6 +76,16 @@ class PublishScheduleMessagesCommand extends ProtocolScheduleMessagesCommand { agreementId, ); + if (!agreementData) { + await this.operationService.markOperationAsFailed( + operationId, + blockchain, + 'Unable to fetch agreement data.', + ERROR_TYPE.PUBLISH.PUBLISH_START_ERROR, + ); + return false; + } + const r0 = await this.blockchainModuleManager.getR0(); const serviceAgreementBid = await this.serviceAgreementService.calculateBid( diff --git a/src/commands/protocols/update/receiver/submit-update-commit-command.js b/src/commands/protocols/update/receiver/submit-update-commit-command.js index 21678393b3..c2bf7eb5a0 100644 --- a/src/commands/protocols/update/receiver/submit-update-commit-command.js +++ b/src/commands/protocols/update/receiver/submit-update-commit-command.js @@ -6,8 +6,9 @@ import { COMMAND_TX_GAS_INCREASE_FACTORS, CONTRACT_FUNCTION_FIXED_GAS_PRICE, } from '../../../../constants/constants.js'; +import SendTransactionCommand from '../../../common/send-transaction-command.js'; -class SubmitUpdateCommitCommand extends Command { +class SubmitUpdateCommitCommand extends SendTransactionCommand { constructor(ctx) { super(ctx); this.commandExecutor = ctx.commandExecutor; @@ -16,6 +17,13 @@ class SubmitUpdateCommitCommand extends Command { this.serviceAgreementService = ctx.serviceAgreementService; this.errorType = ERROR_TYPE.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_ERROR; + + this.txStartStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_SEND_TX_START; + this.txEndStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_SEND_TX_END; + this.txErrorType = ERROR_TYPE.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_SEND_TX_ERROR; + this.txGasIncreaseFactor = COMMAND_TX_GAS_INCREASE_FACTORS.SUBMIT_UPDATE_COMMIT; + this.operationEndStatus = OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_END; + this.commandRetryNumber = COMMAND_RETRIES.SUBMIT_UPDATE_COMMIT; } async execute(command) { @@ -96,88 +104,30 @@ class SubmitUpdateCommitCommand extends Command { if (result?.error) { reject(result.error); } - - resolve(); + resolve(true); }, txGasPrice, ); }); - const sendSubmitUpdateCommitTransactionOperationId = this.operationIdService.generateId(); - try { - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_SEND_TX_START, - sendSubmitUpdateCommitTransactionOperationId, - blockchain, - agreementId, - epoch, - ); - await transactionCompletePromise; - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_SEND_TX_END, - sendSubmitUpdateCommitTransactionOperationId, + return this.sendTransactionAndHandleResult( + transactionCompletePromise, + { blockchain, agreementId, epoch, - ); - } catch (error) { - this.logger.warn( - `Failed to execute ${command.name}, Error Message: ${error.message} for the Service Agreement ` + - `with the ID: ${agreementId}, Blockchain: ${blockchain}, Contract: ${contract}, ` + - `Token ID: ${tokenId}, Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, ` + - `Closest Node: ${closestNode}, Left neighborhood edge: ${leftNeighborhoodEdge}, ` + - `Right neighborhood edge: ${rightNeighborhoodEdge}, `, - +`Epoch: ${epoch}, Operation ID: ${operationId}, Retry number: ${ - COMMAND_RETRIES.SUBMIT_UPDATE_COMMIT - command.retries + 1 - }.`, - ); - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.FAILED, - sendSubmitUpdateCommitTransactionOperationId, - blockchain, - error.message, - ERROR_TYPE.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_SEND_TX_ERROR, - ); - let newGasPrice; - if ( - error.message.includes(`timeout exceeded`) || - error.message.includes(`Pool(TooLowPriority`) - ) { - newGasPrice = Math.ceil( - txGasPrice * COMMAND_TX_GAS_INCREASE_FACTORS.SUBMIT_UPDATE_COMMIT, - ); - } else { - newGasPrice = null; - } - - Object.assign(command, { - data: { ...command.data, gasPrice: newGasPrice }, - message: error.message, - }); - - return Command.retry(); - } - - this.logger.trace( - `Successfully executed ${command.name} for the Service Agreement with the ID: ${agreementId}, ` + - `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + - `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + - `Closest Node: ${closestNode}, Left neighborhood edge: ${leftNeighborhoodEdge}, ` + - `Right neighborhood edge: ${rightNeighborhoodEdge}, `, - +`Operation ID: ${operationId}, Retry number: ${ - COMMAND_RETRIES.SUBMIT_UPDATE_COMMIT - command.retries + 1 - }`, - ); - - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_END, - operationId, - blockchain, - agreementId, - epoch, + operationId, + closestNode, + leftNeighborhoodEdge, + rightNeighborhoodEdge, + contract, + tokenId, + keyword, + hashFunctionId, + txGasPrice, + }, + command, ); - - return Command.empty(); } async retryFinished(command) { diff --git a/src/constants/constants.js b/src/constants/constants.js index 842a1e32e4..327757a3b8 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -439,7 +439,29 @@ export const OPERATIONS = { GET: 'get', }; -export const SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS = 5 * 60; +export const SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS = { + mainnet: 5 * 60, + testnet: 5 * 60, + devnet: 3 * 60, + test: 10, + development: 10, +}; + +export const EXPECTED_TRANSACTION_ERRORS = { + INSUFFICIENT_FUNDS: 'InsufficientFunds', + NODE_ALREADY_SUBMITTED_COMMIT: 'NodeAlreadySubmittedCommit', + TIMEOUT_EXCEEDED: 'timeout exceeded', + TOO_LOW_PRIORITY: 'TooLowPriority', + NODE_ALREADY_REWARDED: 'NodeAlreadyRewarded', + SERVICE_AGREEMENT_DOESNT_EXIST: 'ServiceAgreementDoesntExist', + INVALID_PROXIMITY_SCORE_FUNCTIONS_PAIR_ID: 'InvalidProximityScoreFunctionsPairId', + INVALID_SCORE_FUNCTION_ID: 'InvalidScoreFunctionId', + COMMIT_WINDOW_CLOSED: 'CommitWindowClosed', + NODE_NOT_IN_SHARDING_TABLE: 'NodeNotInShardingTable', + PROOF_WINDOW_CLOSED: 'ProofWindowClosed', + NODE_NOT_AWARDED: 'NodeNotAwarded', + WRONG_MERKLE_PROOF: 'WrongMerkleProof', +}; /** * @constant {number} OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS - diff --git a/src/migration/get-old-service-agreements-migration.js b/src/migration/get-old-service-agreements-migration.js index 871dd68ffb..dd6f33d65b 100644 --- a/src/migration/get-old-service-agreements-migration.js +++ b/src/migration/get-old-service-agreements-migration.js @@ -112,15 +112,15 @@ class GetOldServiceAgreementsMigration extends BaseMigration { assetStorageContractAddress: contract, tokenId: tokenIdToBeFetched, agreementId, - startTime: agreementData.startTime, - epochsNumber: agreementData.epochsNumber, - epochLength: agreementData.epochLength, - scoreFunctionId: agreementData.scoreFunctionId, + startTime: agreementData?.startTime ?? 0, + epochsNumber: agreementData?.epochsNumber ?? 0, + epochLength: agreementData?.epochLength ?? 0, + scoreFunctionId: agreementData?.scoreFunctionId ?? 0, stateIndex: 0, assertionId: assertionIds[0], hashFunctionId: 1, keyword, - proofWindowOffsetPerc: agreementData.proofWindowOffsetPerc, + proofWindowOffsetPerc: agreementData?.proofWindowOffsetPerc ?? 0, dataSource: SERVICE_AGREEMENT_SOURCES.BLOCKCHAIN, }; } catch (error) { diff --git a/src/migration/migration-executor.js b/src/migration/migration-executor.js index d829507702..0c84318a64 100644 --- a/src/migration/migration-executor.js +++ b/src/migration/migration-executor.js @@ -424,7 +424,7 @@ class MigrationExecutor { const serviceAgreementService = container.resolve('serviceAgreementService'); const migration = new GetOldServiceAgreementsMigration( - 'getOldServiceAgreementsMigration', + 'getOldServiceAgreementsMigrationv623', logger, config, repositoryModuleManager, diff --git a/src/modules/blockchain/implementation/gnosis/gnosis-service.js b/src/modules/blockchain/implementation/gnosis/gnosis-service.js index 431ef46466..5a83949d5b 100644 --- a/src/modules/blockchain/implementation/gnosis/gnosis-service.js +++ b/src/modules/blockchain/implementation/gnosis/gnosis-service.js @@ -12,6 +12,13 @@ class GnosisService extends Web3Service { this.baseTokenTicker = 'GNO'; this.tracTicker = 'TRAC'; + + this.defaultGasPrice = this.convertToWei( + process.env.NODE_ENV === NODE_ENVIRONMENTS.MAINNET + ? GNOSIS_DEFAULT_GAS_PRICE.MAINNET + : GNOSIS_DEFAULT_GAS_PRICE.TESTNET, + 'gwei', + ); } getBlockTimeMillis() { @@ -20,33 +27,32 @@ class GnosisService extends Web3Service { async getGasPrice() { let gasPrice; + try { const response = await axios.get(this.config.gasPriceOracleLink); if (response?.data?.average) { - // Returnts gwei + // returns gwei gasPrice = Number(response.data.average); this.logger.debug(`Gas price from Gnosis oracle link: ${gasPrice} gwei`); + gasPrice = this.convertToWei(gasPrice, 'gwei'); } else if (response?.data?.result) { - // Returns wei + // returns wei gasPrice = Number(response.data.result, 10); this.logger.debug(`Gas price from Gnosis oracle link: ${gasPrice} wei`); - return gasPrice; } else { - throw Error( - `Gas price oracle: ${this.config.gasPriceOracleLink} returns gas price in unsupported format.`, + this.logger.warn( + `Gas price oracle: ${this.config.gasPriceOracleLink} returns gas price in unsupported format. Using default value: ${this.defaultGasPrice} Gwei.`, ); } } catch (error) { - const defaultGasPrice = - process.NODE_ENV === NODE_ENVIRONMENTS.MAINNET - ? GNOSIS_DEFAULT_GAS_PRICE.MAINNET - : GNOSIS_DEFAULT_GAS_PRICE.TESTNET; this.logger.warn( - `Failed to fetch the gas price from the Gnosis: ${error}. Using default value: ${defaultGasPrice} Gwei.`, + `Failed to fetch the gas price from the Gnosis: ${error}. Using default value: ${this.defaultGasPrice} Gwei.`, ); - gasPrice = defaultGasPrice; } - return this.convertToWei(gasPrice, 'gwei'); + if (gasPrice) { + return gasPrice; + } + return this.defaultGasPrice; } async healthCheck() { @@ -59,6 +65,10 @@ class GnosisService extends Web3Service { } return false; } + + async getAgreementScoreFunctionId() { + return 2; + } } export default GnosisService; diff --git a/src/modules/blockchain/implementation/hardhat/hardhat-service.js b/src/modules/blockchain/implementation/hardhat/hardhat-service.js index e52efd54bb..315e52644c 100644 --- a/src/modules/blockchain/implementation/hardhat/hardhat-service.js +++ b/src/modules/blockchain/implementation/hardhat/hardhat-service.js @@ -24,6 +24,13 @@ class HardhatService extends Web3Service { async getGasPrice() { return this.convertToWei(20, 'wei'); } + + async getAgreementScoreFunctionId() { + if (this.getBlockchainId() === 'hardhat1:31337') { + return 1; + } + return 2; + } } export default HardhatService; diff --git a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js index 7ba9c69b4d..37ba3fa3c1 100644 --- a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js +++ b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js @@ -201,6 +201,10 @@ class OtParachainService extends Web3Service { }); return wallets; } + + async getAgreementScoreFunctionId() { + return 1; + } } export default OtParachainService; diff --git a/src/modules/blockchain/implementation/web3-service-validator.js b/src/modules/blockchain/implementation/web3-service-validator.js new file mode 100644 index 0000000000..e46699916a --- /dev/null +++ b/src/modules/blockchain/implementation/web3-service-validator.js @@ -0,0 +1,33 @@ +class Web3ServiceValidator { + static validateResult(functionName, contractName, result, logger) { + if (Web3ServiceValidator[`${functionName}Validator`]) { + logger.trace( + `Calling web3 service validator for function name: ${functionName}, contract: ${contractName}`, + ); + return Web3ServiceValidator[`${functionName}Validator`](result); + } + return true; + } + + static getAgreementDataValidator(result) { + if (!result) { + return false; + } + const agreementData = { + startTime: result['0'].toNumber(), + epochsNumber: result['1'], + epochLength: result['2'].toNumber(), + scoreFunctionId: result['4'][0], + proofWindowOffsetPerc: result['4'][1], + }; + return !( + agreementData.startTime === 0 && + agreementData.epochsNumber === 0 && + agreementData.epochLength === 0 && + agreementData.scoreFunctionId === 0 && + agreementData.proofWindowOffsetPerc === 0 + ); + } +} + +export default Web3ServiceValidator; diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index 5a0fb63667..265fb44a78 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -1,3 +1,4 @@ +/* eslint-disable no-await-in-loop */ import { ethers, BigNumber } from 'ethers'; import axios from 'axios'; import async from 'async'; @@ -26,6 +27,7 @@ import { CONTRACT_FUNCTION_GAS_LIMIT_INCREASE_FACTORS, MAX_BLOCKCHAIN_EVENT_SYNC_OF_HISTORICAL_BLOCKS_IN_MILLS, } from '../../../constants/constants.js'; +import Web3ServiceValidator from './web3-service-validator.js'; const require = createRequire(import.meta.url); @@ -95,6 +97,7 @@ class Web3Service { }, concurrency); this.transactionQueues[operationalWallet.address] = transactionQueue; } + this.transactionQueueOrder = Object.keys(this.transactionQueues); } queueTransaction(contractInstance, functionName, transactionArgs, callback, gasPrice) { @@ -141,24 +144,26 @@ class Web3Service { } selectTransactionQueue() { - let selectedQueue = null; - let minLength = Infinity; - - for (const walletAddress of Object.keys(this.transactionQueues)) { - const queue = this.transactionQueues[walletAddress]; - const length = queue.length(); - - if (length === 0) { - return queue; - } - - if (length < minLength) { - selectedQueue = queue; - minLength = length; - } + const queues = Object.keys(this.transactionQueues).map((wallet) => ({ + wallet, + length: this.transactionQueues[wallet].length(), + })); + const minLength = Math.min(...queues.map((queue) => queue.length)); + const shortestQueues = queues.filter((queue) => queue.length === minLength); + if (shortestQueues.length === 1) { + return this.transactionQueues[shortestQueues[0].wallet]; } - return selectedQueue; + const selectedQueueWallet = this.transactionQueueOrder.find((roundRobinNext) => + shortestQueues.some((shortestQueue) => shortestQueue.wallet === roundRobinNext), + ); + + this.transactionQueueOrder.push( + this.transactionQueueOrder + .splice(this.transactionQueueOrder.indexOf(selectedQueueWallet), 1) + .pop(), + ); + return this.transactionQueues[selectedQueueWallet]; } getValidOperationalWallets() { @@ -596,12 +601,30 @@ class Web3Service { } async callContractFunction(contractInstance, functionName, args, contractName = null) { + const maxNumberOfRetries = 3; + const retryDelayInSec = 12; + let retryCount = 0; let result = this.getContractCallCache(contractName, functionName); try { if (!result) { - // eslint-disable-next-line no-await-in-loop - result = await contractInstance[functionName](...args); - this.setContractCallCache(contractName, functionName, result); + while (retryCount < maxNumberOfRetries) { + result = await contractInstance[functionName](...args); + const resultIsValid = Web3ServiceValidator.validateResult( + functionName, + contractName, + result, + this.logger, + ); + if (resultIsValid) { + this.setContractCallCache(contractName, functionName, result); + return result; + } + if (retryCount === maxNumberOfRetries - 1) { + return null; + } + await sleep(retryDelayInSec * 1000); + retryCount += 1; + } } } catch (error) { this._decodeContractCallError(contractInstance, functionName, error, args); @@ -1106,6 +1129,9 @@ class Web3Service { 'getAgreementData', [agreementId], ); + if (!result) { + return null; + } return { startTime: result['0'].toNumber(), epochsNumber: result['1'], diff --git a/src/modules/repository/implementation/sequelize/migrations/20240301095400-remove-corrupted-service-agreements-for-gnosis.js b/src/modules/repository/implementation/sequelize/migrations/20240301095400-remove-corrupted-service-agreements-for-gnosis.js new file mode 100644 index 0000000000..524cd7271a --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20240301095400-remove-corrupted-service-agreements-for-gnosis.js @@ -0,0 +1,21 @@ +import { NODE_ENVIRONMENTS } from '../../../../../constants/constants.js'; + +const MAINNET_GNOSIS_BLOCKCHAIN_ID = 'gnosis:100'; + +// eslint-disable-next-line import/prefer-default-export +export async function up({ context: { queryInterface } }) { + if (process.env.NODE_ENV === NODE_ENVIRONMENTS.MAINNET) { + await queryInterface.sequelize.query(` + delete + from service_agreement + where score_function_id = 0 + and blockchain_id = '${MAINNET_GNOSIS_BLOCKCHAIN_ID}' + `); + + await queryInterface.sequelize.query(` + update service_agreement + set last_commit_epoch = NULL + where blockchain_id = '${MAINNET_GNOSIS_BLOCKCHAIN_ID}' + `); + } +} diff --git a/src/modules/repository/implementation/sequelize/repositories/shard-repository.js b/src/modules/repository/implementation/sequelize/repositories/shard-repository.js index eb550bbe6f..945aa5810a 100644 --- a/src/modules/repository/implementation/sequelize/repositories/shard-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/shard-repository.js @@ -81,6 +81,7 @@ class ShardRepository { }, }, order: [['last_dialed', 'asc']], + group: ['peer_id', 'last_dialed'], limit, raw: true, }); diff --git a/src/service/blockchain-event-listener-service.js b/src/service/blockchain-event-listener-service.js index 6818420e95..005d209ceb 100644 --- a/src/service/blockchain-event-listener-service.js +++ b/src/service/blockchain-event-listener-service.js @@ -568,12 +568,16 @@ class BlockchainEventListenerService { `Skipping processing of asset created event, agreement data present in database for agreement id: ${agreementId} on blockchain ${blockchainId}`, ); } else { - // TODO: Remove when added to the event - const { scoreFunctionId, proofWindowOffsetPerc } = - await this.blockchainModuleManager.getAgreementData( - blockchainId, - agreementId, + const agreementData = await this.blockchainModuleManager.getAgreementData( + blockchainId, + agreementId, + ); + + if (!agreementData) { + this.logger.warn( + `Unable to fetch agreement data while processing asset created event for agreement id: ${agreementId}, blockchain id: ${blockchainId}`, ); + } await this.repositoryModuleManager.updateServiceAgreementRecord( blockchainId, @@ -583,8 +587,8 @@ class BlockchainEventListenerService { startTime, epochsNumber, epochLength, - scoreFunctionId, - proofWindowOffsetPerc, + agreementData?.scoreFunctionId ?? 0, + agreementData?.proofWindowOffsetPerc ?? 0, hashFunctionId, keyword, assertionId, diff --git a/src/service/sharding-table-service.js b/src/service/sharding-table-service.js index 9926672b01..77e3c62cec 100644 --- a/src/service/sharding-table-service.js +++ b/src/service/sharding-table-service.js @@ -235,23 +235,24 @@ class ShardingTableService { if (!this.memoryCachedPeerIds[peerId]) { this.memoryCachedPeerIds[peerId] = { - lastUpdated: 0, lastDialed: 0, lastSeen: 0, }; } - if (this.memoryCachedPeerIds[peerId].lastUpdated < timestampThreshold) { + if ( + this.memoryCachedPeerIds[peerId].lastSeen < timestampThreshold || + this.memoryCachedPeerIds[peerId].lastDialed < timestampThreshold + ) { const [rowsUpdated] = await this.repositoryModuleManager.updatePeerRecordLastSeenAndLastDialed( peerId, now, ); if (rowsUpdated) { - this.memoryCachedPeerIds[peerId].lastUpdated = now; + this.memoryCachedPeerIds[peerId].lastDialed = now; + this.memoryCachedPeerIds[peerId].lastSeen = now; } } - this.memoryCachedPeerIds[peerId].lastDialed = now; - this.memoryCachedPeerIds[peerId].lastSeen = now; } async updatePeerRecordLastDialed(peerId) { @@ -259,21 +260,19 @@ class ShardingTableService { const timestampThreshold = now - PEER_RECORD_UPDATE_DELAY; if (!this.memoryCachedPeerIds[peerId]) { this.memoryCachedPeerIds[peerId] = { - lastUpdated: 0, lastDialed: 0, lastSeen: 0, }; } - if (this.memoryCachedPeerIds[peerId].lastUpdated < timestampThreshold) { + if (this.memoryCachedPeerIds[peerId].lastDialed < timestampThreshold) { const [rowsUpdated] = await this.repositoryModuleManager.updatePeerRecordLastDialed( peerId, now, ); if (rowsUpdated) { - this.memoryCachedPeerIds[peerId].lastUpdated = now; + this.memoryCachedPeerIds[peerId].lastDialed = now; } } - this.memoryCachedPeerIds[peerId].lastDialed = now; } async findPeerAddressAndProtocols(peerId) {