From de883125d416b1d3dca34598eaccf18cca9ce277 Mon Sep 17 00:00:00 2001 From: djordjekovac Date: Tue, 30 Aug 2022 13:00:20 +0200 Subject: [PATCH] Added cleaner commands (#2005) --- src/commands/command-executor.js | 33 ++++------- .../common/commands-cleaner-command.js | 57 +++++++++++++++++++ .../common/operation-id-cleaner-command.js | 54 ++++++++++++++++++ src/constants/constants.js | 30 +++++++++- .../sequelize/sequelize-repository.js | 18 ++++++ .../repository/repository-module-manager.js | 15 +++++ 6 files changed, 185 insertions(+), 22 deletions(-) create mode 100644 src/commands/common/commands-cleaner-command.js create mode 100644 src/commands/common/operation-id-cleaner-command.js diff --git a/src/commands/command-executor.js b/src/commands/command-executor.js index 62d723e178..1abef9752a 100644 --- a/src/commands/command-executor.js +++ b/src/commands/command-executor.js @@ -5,19 +5,6 @@ const { forEach } = require('p-iteration'); const Command = require('./command'); const constants = require('../constants/constants'); -/** - * Command statuses - * @type {{failed: string, expired: string, started: string, pending: string, completed: string}} - */ -const STATUS = { - failed: 'FAILED', - expired: 'EXPIRED', - started: 'STARTED', - pending: 'PENDING', - completed: 'COMPLETED', - repeating: 'REPEATING', -}; - /** * How many commands will run in parallel * @type {number} @@ -106,7 +93,7 @@ class CommandExecutor { if (command.deadline_at && now > command.deadline_at) { this.logger.warn(`Command ${command.name} and ID ${command.id} is too late...`); await this._update(command, { - status: STATUS.expired, + status: constants.COMMAND_STATUS.EXPIRED, }); try { const result = await handler.expired(command); @@ -137,7 +124,7 @@ class CommandExecutor { await this._update( command, { - status: STATUS.started, + status: constants.COMMAND_STATUS.STARTED, }, transaction, ); @@ -148,7 +135,7 @@ class CommandExecutor { await this._update( command, { - status: STATUS.repeating, + status: constants.COMMAND_STATUS.REPEATING, }, transaction, ); @@ -179,7 +166,7 @@ class CommandExecutor { await this._update( command, { - status: STATUS.completed, + status: constants.COMMAND_STATUS.COMPLETED, }, transaction, ); @@ -293,7 +280,7 @@ class CommandExecutor { if (command.retries > 1) { command.data = handler.pack(command.data); await this._update(command, { - status: STATUS.pending, + status: constants.COMMAND_STATUS.PENDING, retries: command.retries - 1, }); const period = command.period ? command.period : 0; @@ -322,7 +309,7 @@ class CommandExecutor { } else { try { await this._update(command, { - status: STATUS.failed, + status: constants.COMMAND_STATUS.FAILED, message: err.message, }); this.logger.warn(`Error in command: ${command.name}, error: ${err.message}`); @@ -359,7 +346,7 @@ class CommandExecutor { const commandInstance = this.commandResolver.resolve(command.name); command.data = commandInstance.pack(command.data); } - command.status = STATUS.pending; + command.status = constants.COMMAND_STATUS.PENDING; const opts = {}; if (transaction != null) { opts.transaction = transaction; @@ -412,7 +399,11 @@ class CommandExecutor { this.logger.info('Replay pending/started commands from the database...'); const pendingCommands = ( await this.repositoryModuleManager.getCommandsWithStatus( - [STATUS.pending, STATUS.started, STATUS.repeating], + [ + constants.COMMAND_STATUS.PENDING, + constants.COMMAND_STATUS.STARTED, + constants.COMMAND_STATUS.REPEATING, + ], ['cleanerCommand', 'autoupdaterCommand'], ) ).filter((command) => !constants.PERMANENT_COMMANDS.includes(command.name)); diff --git a/src/commands/common/commands-cleaner-command.js b/src/commands/common/commands-cleaner-command.js new file mode 100644 index 0000000000..d6326d2440 --- /dev/null +++ b/src/commands/common/commands-cleaner-command.js @@ -0,0 +1,57 @@ +const Command = require('../command'); +const { + COMMAND_STATUS, + FINALIZED_COMMAND_CLEANUP_TIME_MILLS, +} = require('../../constants/constants'); + +/** + * Increases approval for Bidding contract on blockchain + */ +class CommandsCleanerCommand extends Command { + constructor(ctx) { + super(ctx); + this.logger = ctx.logger; + this.repositoryModuleManager = ctx.repositoryModuleManager; + } + + /** + * Executes command and produces one or more events + * @param command + */ + async execute() { + await this.repositoryModuleManager.removeFinalizedCommands([ + COMMAND_STATUS.COMPLETED, + COMMAND_STATUS.FAILED, + COMMAND_STATUS.EXPIRED, + ]); + return Command.repeat(); + } + + /** + * Recover system from failure + * @param command + * @param error + */ + async recover(command, error) { + this.logger.warn(`Failed to clean finalized commands: error: ${error.message}`); + return Command.repeat(); + } + + /** + * Builds default command + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'commandsCleanerCommand', + data: {}, + period: FINALIZED_COMMAND_CLEANUP_TIME_MILLS, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +module.exports = CommandsCleanerCommand; diff --git a/src/commands/common/operation-id-cleaner-command.js b/src/commands/common/operation-id-cleaner-command.js new file mode 100644 index 0000000000..beb74a2d15 --- /dev/null +++ b/src/commands/common/operation-id-cleaner-command.js @@ -0,0 +1,54 @@ +const Command = require('../command'); +const constants = require('../../constants/constants'); + +/** + * Increases approval for Bidding contract on blockchain + */ +class OperationIdCleanerCommand extends Command { + constructor(ctx) { + super(ctx); + this.logger = ctx.logger; + this.repositoryModuleManager = ctx.repositoryModuleManager; + } + + /** + * Executes command and produces one or more events + * @param command + */ + async execute() { + const timeToBeDeleted = Date.now() - constants.OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS; + await this.repositoryModuleManager.removeOperationIdRecord(timeToBeDeleted, [ + constants.OPERATION_ID_STATUS.COMPLETED, + constants.OPERATION_ID_STATUS.FAILED, + ]); + return Command.repeat(); + } + + /** + * Recover system from failure + * @param command + * @param error + */ + async recover(command, error) { + this.logger.warn(`Failed to clean operation ids table: error: ${error.message}`); + return Command.repeat(); + } + + /** + * Builds default command + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'operationIdCleanerCommand', + period: constants.OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + data: {}, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +module.exports = OperationIdCleanerCommand; diff --git a/src/constants/constants.js b/src/constants/constants.js index f3d93e01ad..fc1b447c66 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -79,7 +79,12 @@ exports.OPERATION_IDS_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000; /** * @constant {Array} PERMANENT_COMMANDS - List of all permanent commands */ -exports.PERMANENT_COMMANDS = ['otnodeUpdateCommand', 'sendTelemetryCommand']; +exports.PERMANENT_COMMANDS = [ + 'otnodeUpdateCommand', + 'sendTelemetryCommand', + 'operationIdCleanerCommand', + 'commandsCleanerCommand', +]; /** * @constant {number} MAX_COMMAND_DELAY_IN_MILLS - Maximum delay for commands @@ -248,6 +253,29 @@ exports.OPERATIONS = { SEARCH: 'search', }; +/** + * @constant {number} OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS - + * operation id command cleanup interval time 24h + */ +exports.OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000; +/** + * @constant {number} FINALIZED_COMMAND_CLEANUP_TIME_MILLS - Command cleanup interval time + * finalized commands command cleanup interval time 24h + */ +exports.FINALIZED_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000; +/** + * @constant {number} COMMAND_STATUS - + * Status for commands + */ +exports.COMMAND_STATUS = { + FAILED: 'FAILED', + EXPIRED: 'EXPIRED', + STARTED: 'STARTED', + PENDING: 'PENDING', + COMPLETED: 'COMPLETED', + REPEATING: 'REPEATING', +}; + /** * @constant {object} NETWORK_PROTOCOLS - * Network protocols diff --git a/src/modules/repository/implementation/sequelize/sequelize-repository.js b/src/modules/repository/implementation/sequelize/sequelize-repository.js index f9038100bf..6566f47cd4 100644 --- a/src/modules/repository/implementation/sequelize/sequelize-repository.js +++ b/src/modules/repository/implementation/sequelize/sequelize-repository.js @@ -141,6 +141,15 @@ class SequelizeRepository { }); } + async removeFinalizedCommands(finalizedStatuses) { + await this.models.commands.destroy({ + where: { + status: { [Sequelize.Op.in]: finalizedStatuses }, + started_at: { [Sequelize.Op.lte]: Date.now() }, + }, + }); + } + // OPERATION_ID async createOperationIdRecord(handlerData) { const handlerRecord = await this.models.operation_ids.create(handlerData); @@ -164,6 +173,15 @@ class SequelizeRepository { }); } + async removeOperationIdRecord(timeToBeDeleted, statuses) { + await this.models.operation_ids.destroy({ + where: { + timestamp: { [Sequelize.Op.lt]: timeToBeDeleted }, + status: { [Sequelize.Op.in]: statuses }, + }, + }); + } + async getNumberOfNodesFoundForPublish(publishId) { return this.models.publish.findOne({ attributes: ['nodes_found'], diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 1c217d5412..77537eba47 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -45,6 +45,12 @@ class RepositoryModuleManager extends BaseModuleManager { } } + async removeFinalizedCommands(finalizedStatuses) { + if (this.initialized) { + return this.getImplementation().module.removeFinalizedCommands(finalizedStatuses); + } + } + // OPERATION ID TABLE async createOperationIdRecord(handlerData) { if (this.initialized) { @@ -64,6 +70,15 @@ class RepositoryModuleManager extends BaseModuleManager { } } + async removeOperationIdRecord(timeToBeDeleted, statuses) { + if (this.initialized) { + return this.getImplementation().module.removeOperationIdRecord( + timeToBeDeleted, + statuses, + ); + } + } + // publish table async createOperationRecord(operation, operationId, status) { if (this.initialized) {