diff --git a/ot-node.js b/ot-node.js index 05aab1138f..06a00c7bab 100644 --- a/ot-node.js +++ b/ot-node.js @@ -82,6 +82,12 @@ class OTNode { this.startTelemetryModule(); this.resumeCommandExecutor(); this.logger.info('Node is up and running!'); + + MigrationExecutor.executeGetOldServiceAgreementsMigration( + this.container, + this.logger, + this.config, + ); } checkNodeVersion() { diff --git a/package-lock.json b/package-lock.json index fa57218fd3..90f9944f5d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.2.1", + "version": "6.2.2", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.2.1", + "version": "6.2.2", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index 6b30c31598..99fc299236 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.2.1", + "version": "6.2.2", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/commands/command-executor.js b/src/commands/command-executor.js index 5f2e36f057..e61b90ded3 100644 --- a/src/commands/command-executor.js +++ b/src/commands/command-executor.js @@ -233,7 +233,7 @@ class CommandExecutor { * @private */ async _addDefaultCommand(name) { - await this._delete(name); + await this.delete(name); const handler = this.commandResolver.resolve(name); if (!handler) { this.logger.warn(`Command '${name}' will not be executed.`); @@ -376,9 +376,8 @@ class CommandExecutor { * Delete command from database * @param name * @returns {Promise} - * @private */ - async _delete(name) { + async delete(name) { await this.repositoryModuleManager.destroyCommand(name); } diff --git a/src/commands/protocols/common/epoch-check-command.js b/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js similarity index 81% rename from src/commands/protocols/common/epoch-check-command.js rename to src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js index 4d48e32457..2807d0da0a 100644 --- a/src/commands/protocols/common/epoch-check-command.js +++ b/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js @@ -1,5 +1,5 @@ /* eslint-disable no-await-in-loop */ -import Command from '../../command.js'; +import Command from '../../../command.js'; import { COMMAND_QUEUE_PARALLELISM, COMMAND_RETRIES, @@ -8,9 +8,9 @@ import { ERROR_TYPE, TRIPLE_STORE_REPOSITORIES, SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS, -} from '../../../constants/constants.js'; +} from '../../../../constants/constants.js'; -class EpochCheckCommand extends Command { +class BlockchainEpochCheckCommand extends Command { constructor(ctx) { super(ctx); this.commandExecutor = ctx.commandExecutor; @@ -24,76 +24,81 @@ class EpochCheckCommand extends Command { this.hashingService = ctx.hashingService; this.tripleStoreService = ctx.tripleStoreService; - this.errorType = ERROR_TYPE.COMMIT_PROOF.EPOCH_CHECK_ERROR; + this.errorType = ERROR_TYPE.COMMIT_PROOF.BLOCKCHAIN_EPOCH_CHECK_ERROR; } async execute(command) { - this.logger.info('Epoch check: Starting epoch check command'); - const operationId = this.operationIdService.generateId(); - - await Promise.all( - this.blockchainModuleManager.getImplementationNames().map(async (blockchain) => { - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.EPOCH_CHECK_START, - operationId, - blockchain, - ); + const { operationId, blockchain } = command.data; + this.logger.info( + `Epoch check: Starting blockchain epoch check command for ${blockchain} with operation id: ${operationId}`, + ); - const commitWindowDurationPerc = - await this.blockchainModuleManager.getCommitWindowDurationPerc(blockchain); - const proofWindowDurationPerc = - await this.blockchainModuleManager.getProofWindowDurationPerc(blockchain); - let totalTransactions = await this.calculateTotalTransactions( - blockchain, - commitWindowDurationPerc, - proofWindowDurationPerc, - command.period, - ); + this.operationIdService.emitChangeEvent( + OPERATION_ID_STATUS.COMMIT_PROOF.EPOCH_CHECK_START, + operationId, + blockchain, + ); - // We don't expect to have this many transactions in one epoch check window. - // This is just to make sure we don't schedule too many commands and block the queue - // TODO: find general solution for all commands scheduling blockchain transactions - totalTransactions = Math.min(totalTransactions, COMMAND_QUEUE_PARALLELISM * 0.3); + const commitWindowDurationPerc = + await this.blockchainModuleManager.getCommitWindowDurationPerc(blockchain); + const proofWindowDurationPerc = + await this.blockchainModuleManager.getProofWindowDurationPerc(blockchain); + let totalTransactions = await this.calculateTotalTransactions( + blockchain, + commitWindowDurationPerc, + proofWindowDurationPerc, + command.period, + ); - const transactionQueueLength = - this.blockchainModuleManager.getTotalTransactionQueueLength(blockchain); - if (transactionQueueLength >= totalTransactions) return; + // We don't expect to have this many transactions in one epoch check window. + // This is just to make sure we don't schedule too many commands and block the queue + // TODO: find general solution for all commands scheduling blockchain transactions + totalTransactions = Math.min(totalTransactions, COMMAND_QUEUE_PARALLELISM * 0.3); + + const transactionQueueLength = + this.blockchainModuleManager.getTotalTransactionQueueLength(blockchain); + if (transactionQueueLength >= totalTransactions) { + this.logger.debug( + `Epoch check: Current transaction queue length is ${transactionQueueLength}, ` + + `exceeding the maximum total transactions: ${totalTransactions} for ${blockchain}` + + `with operation id: ${operationId}`, + ); + return Command.repeat(); + } - totalTransactions -= transactionQueueLength; + totalTransactions -= transactionQueueLength; - const [r0, r2, totalNodesNumber, minStake, maxStake] = await Promise.all([ - this.blockchainModuleManager.getR0(blockchain), - this.blockchainModuleManager.getR2(blockchain), - this.repositoryModuleManager.getPeersCount(blockchain), - this.blockchainModuleManager.getMinimumStake(blockchain), - this.blockchainModuleManager.getMaximumStake(blockchain), - ]); + const [r0, r2, totalNodesNumber, minStake, maxStake] = await Promise.all([ + this.blockchainModuleManager.getR0(blockchain), + this.blockchainModuleManager.getR2(blockchain), + this.repositoryModuleManager.getPeersCount(blockchain), + this.blockchainModuleManager.getMinimumStake(blockchain), + this.blockchainModuleManager.getMaximumStake(blockchain), + ]); - await Promise.all([ - this.scheduleSubmitCommitCommands( - blockchain, - Math.floor(totalTransactions / 2), - commitWindowDurationPerc, - r0, - r2, - totalNodesNumber, - minStake, - maxStake, - ), - this.scheduleCalculateProofsCommands( - blockchain, - Math.ceil(totalTransactions / 2), - proofWindowDurationPerc, - r0, - ), - ]); + await Promise.all([ + this.scheduleSubmitCommitCommands( + blockchain, + Math.floor(totalTransactions / 2), + commitWindowDurationPerc, + r0, + r2, + totalNodesNumber, + minStake, + maxStake, + ), + this.scheduleCalculateProofsCommands( + blockchain, + Math.ceil(totalTransactions / 2), + proofWindowDurationPerc, + r0, + ), + ]); - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.COMMIT_PROOF.EPOCH_CHECK_END, - operationId, - blockchain, - ); - }), + this.operationIdService.emitChangeEvent( + OPERATION_ID_STATUS.COMMIT_PROOF.EPOCH_CHECK_END, + operationId, + blockchain, ); return Command.repeat(); @@ -440,13 +445,6 @@ class EpochCheckCommand extends Command { return transactionsPerEpochCheck * numberOfWallets; } - calculateCommandPeriod() { - const devEnvironment = - process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'test'; - - return devEnvironment ? 30_000 : 120_000; - } - /** * Recover system from failure * @param command @@ -465,14 +463,13 @@ class EpochCheckCommand extends Command { */ default(map) { const command = { - name: 'epochCheckCommand', + name: 'blockchainEpochCheckCommand', data: {}, transactional: false, - period: this.calculateCommandPeriod(), }; Object.assign(command, map); return command; } } -export default EpochCheckCommand; +export default BlockchainEpochCheckCommand; diff --git a/src/commands/protocols/common/epoch-check/epoch-check-command.js b/src/commands/protocols/common/epoch-check/epoch-check-command.js new file mode 100644 index 0000000000..f839a22090 --- /dev/null +++ b/src/commands/protocols/common/epoch-check/epoch-check-command.js @@ -0,0 +1,74 @@ +import Command from '../../../command.js'; +import { ERROR_TYPE } from '../../../../constants/constants.js'; + +class EpochCheckCommand extends Command { + constructor(ctx) { + super(ctx); + this.commandExecutor = ctx.commandExecutor; + this.blockchainModuleManager = ctx.blockchainModuleManager; + + this.errorType = ERROR_TYPE.COMMIT_PROOF.EPOCH_CHECK_ERROR; + } + + calculateCommandPeriod() { + const devEnvironment = + process.env.NODE_ENV === 'development' || process.env.NODE_ENV === 'test'; + + return devEnvironment ? 30_000 : 120_000; + } + + async execute() { + const operationId = this.operationIdService.generateId(); + + this.logger.info( + `Epoch check: Starting epoch check command for operation id: ${operationId}`, + ); + + await this.commandExecutor.delete('blockchainEpochCheckCommand'); + + await Promise.all( + this.blockchainModuleManager.getImplementationNames().map(async (blockchain) => { + const commandData = { + blockchain, + operationId, + }; + return this.commandExecutor.add({ + name: 'blockchainEpochCheckCommand', + data: commandData, + period: this.calculateCommandPeriod(), + }); + }), + ); + + return Command.empty(); + } + + /** + * Recover system from failure + * @param command + * @param error + */ + async recover(command) { + this.logger.warn(`Failed to execute ${command.name}. Error: ${command.message}`); + + return Command.repeat(); + } + + /** + * Builds default epochCheckCommand + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'epochCheckCommand', + data: {}, + transactional: false, + period: this.calculateCommandPeriod(), + }; + Object.assign(command, map); + return command; + } +} + +export default EpochCheckCommand; diff --git a/src/commands/protocols/common/simple-asset-sync-command.js b/src/commands/protocols/common/simple-asset-sync-command.js index dd580c1c74..cd08272a32 100644 --- a/src/commands/protocols/common/simple-asset-sync-command.js +++ b/src/commands/protocols/common/simple-asset-sync-command.js @@ -137,26 +137,29 @@ class SimpleAssetSyncCommand extends Command { blockchain, OPERATION_ID_STATUS.COMMIT_PROOF.SIMPLE_ASSET_SYNC_END, ); - - if (getResult?.status === OPERATION_ID_STATUS.COMPLETED) { + const getOperationCachedData = await this.operationIdService.getCachedOperationIdData( + getOperationId, + ); + if (getOperationCachedData.message === 'Unable to find assertion on the network!') { this.logger.info( - `[SIMPLE_ASSET_SYNC] (${operationId}): Successfully executed command for the ` + + `[SIMPLE_ASSET_SYNC] (${operationId}): Failed to executed command. Couldn't find asset on the network for the ` + `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + `State Index: ${stateIndex}, Network Get Operation ID: ${getOperationId}, `, ); - return this.continueSequence(command.data, command.sequence, { - retries: COMMAND_RETRIES.SUBMIT_COMMIT, - }); + return Command.empty(); } - - this.logger.log( - `[SIMPLE_ASSET_SYNC] (${operationId}): Failed to executed command. Couldn't find asset on the network for the ` + + this.logger.info( + `[SIMPLE_ASSET_SYNC] (${operationId}): Successfully executed command for the ` + `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + `State Index: ${stateIndex}, Network Get Operation ID: ${getOperationId}, `, ); + + return this.continueSequence(command.data, command.sequence, { + retries: COMMAND_RETRIES.SUBMIT_COMMIT, + }); } async retryFinished(command) { diff --git a/src/constants/constants.js b/src/constants/constants.js index 6d46ad577e..842a1e32e4 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -239,7 +239,7 @@ export const NEURO_DEFAULT_GAS_PRICE = { export const CONTRACT_FUNCTION_FIXED_GAS_PRICE = { 'otp:2043': { - SUBMIT_UPDATE_COMMIT: 15, + SUBMIT_UPDATE_COMMIT: 30, }, }; @@ -337,6 +337,7 @@ export const ERROR_TYPE = { COMMIT_PROOF: { CALCULATE_PROOFS_ERROR: 'CalculateProofsError', EPOCH_CHECK_ERROR: 'EpochCheckError', + BLOCKCHAIN_EPOCH_CHECK_ERROR: 'BlockchainEpochCheckError', SIMPLE_ASSET_SYNC_ERROR: 'SimpleAssetSyncError', SUBMIT_COMMIT_ERROR: 'SubmitCommitError', SUBMIT_COMMIT_SEND_TX_ERROR: 'SubmitCommitSendTxError', diff --git a/src/migration/get-old-service-agreements-migration.js b/src/migration/get-old-service-agreements-migration.js new file mode 100644 index 0000000000..871dd68ffb --- /dev/null +++ b/src/migration/get-old-service-agreements-migration.js @@ -0,0 +1,133 @@ +import BaseMigration from './base-migration.js'; +import { SERVICE_AGREEMENT_SOURCES } from '../constants/constants.js'; + +const BATCH_SIZE = 50; +const GNOSIS_MAINNET_CHAIN_ID = 'gnosis:100'; +const GNOSIS_MAINNET_ASSET_STORAGE_CONTRACT_ADDRESS = '0xf81a8c0008de2dcdb73366cf78f2b178616d11dd'; + +class GetOldServiceAgreementsMigration extends BaseMigration { + constructor( + migrationName, + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + serviceAgreementService, + ) { + super(migrationName, logger, config); + this.repositoryModuleManager = repositoryModuleManager; + this.blockchainModuleManager = blockchainModuleManager; + this.serviceAgreementService = serviceAgreementService; + } + + async executeMigration() { + const blockchainId = this.blockchainModuleManager + .getImplementationNames() + .find((s) => s === GNOSIS_MAINNET_CHAIN_ID); + + if (blockchainId) { + const contract = GNOSIS_MAINNET_ASSET_STORAGE_CONTRACT_ADDRESS; + + const existingTokenIds = + await this.repositoryModuleManager.getServiceAgreementsTokenIds(0, blockchainId); + + const latestTokenId = Number( + await this.blockchainModuleManager.getLatestTokenId(blockchainId, contract), + ); + + const missingTokenIds = []; + let expectedTokenId = 0; + existingTokenIds.forEach((serviceAgreement) => { + while (serviceAgreement.tokenId > expectedTokenId) { + missingTokenIds.push(expectedTokenId); + expectedTokenId += 1; + } + expectedTokenId += 1; + }); + + for ( + let i = (existingTokenIds[existingTokenIds.length - 1] ?? -1) + 1; + i <= latestTokenId; + i += 1 + ) { + missingTokenIds.push(i); + } + + let batchNumber = 0; + // Check < or <= condition + while (batchNumber * BATCH_SIZE < missingTokenIds.length) { + const promises = []; + for ( + let i = batchNumber * BATCH_SIZE; + i < missingTokenIds.length && i < (batchNumber + 1) * BATCH_SIZE; + i += 1 + ) { + const tokenIdToBeFetched = missingTokenIds[i]; + promises.push( + this.getAndProcessMissingServiceAgreement( + tokenIdToBeFetched, + blockchainId, + contract, + ), + ); + } + + // eslint-disable-next-line no-await-in-loop + const missingAgreements = await Promise.all(promises); + + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.bulkCreateServiceAgreementRecords( + missingAgreements.filter((agreement) => agreement != null), + ); + batchNumber += 1; + } + } + } + + async getAndProcessMissingServiceAgreement(tokenIdToBeFetched, blockchainId, contract) { + try { + const assertionIds = await this.blockchainModuleManager.getAssertionIds( + blockchainId, + contract, + tokenIdToBeFetched, + ); + const keyword = this.blockchainModuleManager.encodePacked( + blockchainId, + ['address', 'bytes32'], + [contract, assertionIds[0]], + ); + const agreementId = this.serviceAgreementService.generateId( + blockchainId, + contract, + tokenIdToBeFetched, + keyword, + 1, + ); + const agreementData = await this.blockchainModuleManager.getAgreementData( + blockchainId, + agreementId, + ); + return { + blockchainId, + assetStorageContractAddress: contract, + tokenId: tokenIdToBeFetched, + agreementId, + startTime: agreementData.startTime, + epochsNumber: agreementData.epochsNumber, + epochLength: agreementData.epochLength, + scoreFunctionId: agreementData.scoreFunctionId, + stateIndex: 0, + assertionId: assertionIds[0], + hashFunctionId: 1, + keyword, + proofWindowOffsetPerc: agreementData.proofWindowOffsetPerc, + dataSource: SERVICE_AGREEMENT_SOURCES.BLOCKCHAIN, + }; + } catch (error) { + this.logger.warn(`Unable to fetch agreement data for token id: ${tokenIdToBeFetched}`); + return null; + } + } +} + +export default GetOldServiceAgreementsMigration; diff --git a/src/migration/migration-executor.js b/src/migration/migration-executor.js index 986d375e6f..d829507702 100644 --- a/src/migration/migration-executor.js +++ b/src/migration/migration-executor.js @@ -17,6 +17,7 @@ import UalExtensionTripleStoreMigration from './ual-extension-triple-store-migra import MarkStakingEventsAsProcessedMigration from './mark-staking-events-as-processed-migration.js'; import RemoveServiceAgreementsForChiadoMigration from './remove-service-agreements-for-chiado-migration.js'; import MultipleOpWalletsUserConfigurationMigration from './multiple-op-wallets-user-configuration-migration.js'; +import GetOldServiceAgreementsMigration from './get-old-service-agreements-migration.js'; class MigrationExecutor { static async executePullShardingTableMigration(container, logger, config) { @@ -415,6 +416,32 @@ class MigrationExecutor { } } + static async executeGetOldServiceAgreementsMigration(container, logger, config) { + if (process.env.NODE_ENV !== NODE_ENVIRONMENTS.MAINNET) return; + + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + const serviceAgreementService = container.resolve('serviceAgreementService'); + + const migration = new GetOldServiceAgreementsMigration( + 'getOldServiceAgreementsMigration', + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + serviceAgreementService, + ); + if (!(await migration.migrationAlreadyExecuted())) { + try { + await migration.migrate(); + } catch (error) { + logger.error( + `Unable to execute get old service agreements migration. Error: ${error.message}`, + ); + } + } + } + static exitNode(code = 0) { process.exit(code); } diff --git a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js index 34f64bb5b6..7ba9c69b4d 100644 --- a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js +++ b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js @@ -155,6 +155,10 @@ class OtParachainService extends Web3Service { if (!isRpcError) throw error; } + async getLatestTokenId(assetContractAddress) { + return this.provider.getStorageAt(assetContractAddress.toString().toLowerCase(), 7); + } + async restartParachainProvider() { this.rpcNumber = (this.rpcNumber + 1) % this.config.rpcEndpoints.length; this.logger.warn( diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index 583fd3aef6..5a0fb63667 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -31,7 +31,7 @@ const require = createRequire(import.meta.url); const ABIs = { ContentAsset: require('dkg-evm-module/abi/ContentAsset.json'), - ContentAssetStorage: require('dkg-evm-module/abi/ContentAssetStorage.json'), + ContentAssetStorage: require('dkg-evm-module/abi/ContentAssetStorageV2.json'), AssertionStorage: require('dkg-evm-module/abi/AssertionStorage.json'), Staking: require('dkg-evm-module/abi/Staking.json'), StakingStorage: require('dkg-evm-module/abi/StakingStorage.json'), @@ -1055,7 +1055,17 @@ class Web3Service { } async getLatestTokenId(assetContractAddress) { - return this.provider.getStorageAt(assetContractAddress.toString().toLowerCase(), 7); + const assetStorageContractInstance = + this.assetStorageContracts[assetContractAddress.toString().toLowerCase()]; + if (!assetStorageContractInstance) + throw new Error('Unknown asset storage contract address'); + + const lastTokenId = await this.callContractFunction( + assetStorageContractInstance, + 'lastTokenId', + [], + ); + return lastTokenId; } getAssetStorageContractAddresses() { @@ -1096,7 +1106,6 @@ class Web3Service { 'getAgreementData', [agreementId], ); - return { startTime: result['0'].toNumber(), epochsNumber: result['1'], diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index 5682b06970..57e8ef9101 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -251,6 +251,17 @@ class ServiceAgreementRepository { order: [['token_id', 'asc']], }); } + + async getServiceAgreementsTokenIds(fromTokenId, blockchainId) { + return this.model.findAll({ + attributes: ['tokenId'], + where: { + tokenId: { [Sequelize.Op.gte]: fromTokenId }, + blockchainId, + }, + order: [['token_id', 'asc']], + }); + } } export default ServiceAgreementRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index b510558bd9..7e1498e831 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -442,6 +442,13 @@ class RepositoryModuleManager extends BaseModuleManager { async getServiceAgreements(fromTokenId, batchSize) { return this.getRepository('service_agreement').getServiceAgreements(fromTokenId, batchSize); } + + async getServiceAgreementsTokenIds(fromTokenId, blockchainId) { + return this.getRepository('service_agreement').getServiceAgreementsTokenIds( + fromTokenId, + blockchainId, + ); + } } export default RepositoryModuleManager;