diff --git a/ot-node.js b/ot-node.js index e2e59b0224..6bdcf4fa9b 100644 --- a/ot-node.js +++ b/ot-node.js @@ -54,6 +54,12 @@ class OTNode { await this.initializeCommandExecutor(); await this.initializeShardingTableService(); + await MigrationExecutor.executeMarkStakingEventsAsProcessedMigration( + this.container, + this.logger, + this.config, + ); + MigrationExecutor.executeUalExtensionTripleStoreMigration( this.container, this.logger, @@ -62,6 +68,12 @@ class OTNode { await this.initializeBlockchainEventListenerService(); }); + await MigrationExecutor.executePullShardingTableMigration( + this.container, + this.logger, + this.config, + ); + await this.initializeRouters(); await this.startNetworkModule(); this.startTelemetryModule(); diff --git a/package-lock.json b/package-lock.json index 96ed40c12e..43c41e49d3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.1.0", + "version": "6.1.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.1.0", + "version": "6.1.1", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index 04b8a57868..7ddcc1b845 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.1.0", + "version": "6.1.1", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/migration/mark-staking-events-as-processed-migration.js b/src/migration/mark-staking-events-as-processed-migration.js new file mode 100644 index 0000000000..2fe63ac39d --- /dev/null +++ b/src/migration/mark-staking-events-as-processed-migration.js @@ -0,0 +1,25 @@ +/* eslint-disable no-await-in-loop */ +import BaseMigration from './base-migration.js'; + +class MarkStakingEventsAsProcessedMigration extends BaseMigration { + constructor(migrationName, logger, config, repositoryModuleManager, blockchainModuleManager) { + super(migrationName, logger, config); + this.repositoryModuleManager = repositoryModuleManager; + this.blockchainModuleManager = blockchainModuleManager; + } + + async executeMigration() { + this.logger.info('Marking old blockchain events as processed'); + for (const blockchain of this.blockchainModuleManager.getImplementationNames()) { + const timestamp = Date.now(); + const block = await this.blockchainModuleManager.getLatestBlock(blockchain); + const query = `update blockchain + set last_checked_block = ${block.number}, + last_checked_timestamp = ${timestamp} + where blockchain_id = 'otp:2043'`; + await this.repositoryModuleManager.query(query); + } + } +} + +export default MarkStakingEventsAsProcessedMigration; diff --git a/src/migration/migration-executor.js b/src/migration/migration-executor.js index c639c10b47..847a9308cf 100644 --- a/src/migration/migration-executor.js +++ b/src/migration/migration-executor.js @@ -14,6 +14,7 @@ import ServiceAgreementsDataInspector from './service-agreements-data-inspector. import ServiceAgreementsInvalidDataMigration from './service-agreements-invalid-data-migration.js'; import UalExtensionUserConfigurationMigration from './ual-extension-user-configuration-migration.js'; import UalExtensionTripleStoreMigration from './ual-extension-triple-store-migration.js'; +import MarkStakingEventsAsProcessedMigration from './mark-staking-events-as-processed-migration.js'; class MigrationExecutor { static async executePullShardingTableMigration(container, logger, config) { @@ -28,7 +29,7 @@ class MigrationExecutor { const validationModuleManager = container.resolve('validationModuleManager'); const migration = new PullBlockchainShardingTableMigration( - 'pullShardingTableMigrationV613', + 'pullShardingTableMigrationV611', logger, config, repositoryModuleManager, @@ -338,6 +339,31 @@ class MigrationExecutor { } } + static async executeMarkStakingEventsAsProcessedMigration(container, logger, config) { + if (process.env.NODE_ENV !== NODE_ENVIRONMENTS.MAINNET) return; + + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + + const migration = new MarkStakingEventsAsProcessedMigration( + 'markStakingEventsAsProcessedMigration', + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + ); + if (!(await migration.migrationAlreadyExecuted())) { + try { + await migration.migrate(); + } catch (error) { + logger.error( + `Unable to execute mark staking events as processed migration. Error: ${error.message}`, + ); + this.exitNode(1); + } + } + } + static exitNode(code = 0) { process.exit(code); } diff --git a/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js b/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js index c03087cadb..f19fe1530f 100644 --- a/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js @@ -7,20 +7,30 @@ class BlockchainEventRepository { } async insertBlockchainEvents(events) { - const inserted = await this.model.bulkCreate( - events.map((event) => ({ - contract: event.contract, - event: event.event, - data: event.data, - block: event.block, - blockchainId: event.blockchainId, - processed: false, - })), - { - ignoreDuplicates: true, - }, - ); - return inserted.map((event) => event.dataValues); + const chunkSize = 10000; + let insertedEvents = []; + + for (let i = 0; i < events.length; i += chunkSize) { + const chunk = events.slice(i, i + chunkSize); + // eslint-disable-next-line no-await-in-loop + const insertedChunk = await this.model.bulkCreate( + chunk.map((event) => ({ + contract: event.contract, + event: event.event, + data: event.data, + block: event.block, + blockchainId: event.blockchainId, + processed: false, + })), + { + ignoreDuplicates: true, + }, + ); + + insertedEvents = insertedEvents.concat(insertedChunk.map((event) => event.dataValues)); + } + + return insertedEvents; } async getAllUnprocessedBlockchainEvents(eventNames, blockchainId) {