Skip to content

Commit

Permalink
Added cleaner commands (#2005)
Browse files Browse the repository at this point in the history
  • Loading branch information
djordjekovac authored Aug 30, 2022
1 parent 32e64b3 commit de88312
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 22 deletions.
33 changes: 12 additions & 21 deletions src/commands/command-executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,6 @@ const { forEach } = require('p-iteration');
const Command = require('./command');
const constants = require('../constants/constants');

/**
* Command statuses
* @type {{failed: string, expired: string, started: string, pending: string, completed: string}}
*/
const STATUS = {
failed: 'FAILED',
expired: 'EXPIRED',
started: 'STARTED',
pending: 'PENDING',
completed: 'COMPLETED',
repeating: 'REPEATING',
};

/**
* How many commands will run in parallel
* @type {number}
Expand Down Expand Up @@ -106,7 +93,7 @@ class CommandExecutor {
if (command.deadline_at && now > command.deadline_at) {
this.logger.warn(`Command ${command.name} and ID ${command.id} is too late...`);
await this._update(command, {
status: STATUS.expired,
status: constants.COMMAND_STATUS.EXPIRED,
});
try {
const result = await handler.expired(command);
Expand Down Expand Up @@ -137,7 +124,7 @@ class CommandExecutor {
await this._update(
command,
{
status: STATUS.started,
status: constants.COMMAND_STATUS.STARTED,
},
transaction,
);
Expand All @@ -148,7 +135,7 @@ class CommandExecutor {
await this._update(
command,
{
status: STATUS.repeating,
status: constants.COMMAND_STATUS.REPEATING,
},
transaction,
);
Expand Down Expand Up @@ -179,7 +166,7 @@ class CommandExecutor {
await this._update(
command,
{
status: STATUS.completed,
status: constants.COMMAND_STATUS.COMPLETED,
},
transaction,
);
Expand Down Expand Up @@ -293,7 +280,7 @@ class CommandExecutor {
if (command.retries > 1) {
command.data = handler.pack(command.data);
await this._update(command, {
status: STATUS.pending,
status: constants.COMMAND_STATUS.PENDING,
retries: command.retries - 1,
});
const period = command.period ? command.period : 0;
Expand Down Expand Up @@ -322,7 +309,7 @@ class CommandExecutor {
} else {
try {
await this._update(command, {
status: STATUS.failed,
status: constants.COMMAND_STATUS.FAILED,
message: err.message,
});
this.logger.warn(`Error in command: ${command.name}, error: ${err.message}`);
Expand Down Expand Up @@ -359,7 +346,7 @@ class CommandExecutor {
const commandInstance = this.commandResolver.resolve(command.name);
command.data = commandInstance.pack(command.data);
}
command.status = STATUS.pending;
command.status = constants.COMMAND_STATUS.PENDING;
const opts = {};
if (transaction != null) {
opts.transaction = transaction;
Expand Down Expand Up @@ -412,7 +399,11 @@ class CommandExecutor {
this.logger.info('Replay pending/started commands from the database...');
const pendingCommands = (
await this.repositoryModuleManager.getCommandsWithStatus(
[STATUS.pending, STATUS.started, STATUS.repeating],
[
constants.COMMAND_STATUS.PENDING,
constants.COMMAND_STATUS.STARTED,
constants.COMMAND_STATUS.REPEATING,
],
['cleanerCommand', 'autoupdaterCommand'],
)
).filter((command) => !constants.PERMANENT_COMMANDS.includes(command.name));
Expand Down
57 changes: 57 additions & 0 deletions src/commands/common/commands-cleaner-command.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
const Command = require('../command');
const {
COMMAND_STATUS,
FINALIZED_COMMAND_CLEANUP_TIME_MILLS,
} = require('../../constants/constants');

/**
* Increases approval for Bidding contract on blockchain
*/
class CommandsCleanerCommand extends Command {
constructor(ctx) {
super(ctx);
this.logger = ctx.logger;
this.repositoryModuleManager = ctx.repositoryModuleManager;
}

/**
* Executes command and produces one or more events
* @param command
*/
async execute() {
await this.repositoryModuleManager.removeFinalizedCommands([
COMMAND_STATUS.COMPLETED,
COMMAND_STATUS.FAILED,
COMMAND_STATUS.EXPIRED,
]);
return Command.repeat();
}

/**
* Recover system from failure
* @param command
* @param error
*/
async recover(command, error) {
this.logger.warn(`Failed to clean finalized commands: error: ${error.message}`);
return Command.repeat();
}

/**
* Builds default command
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'commandsCleanerCommand',
data: {},
period: FINALIZED_COMMAND_CLEANUP_TIME_MILLS,
transactional: false,
};
Object.assign(command, map);
return command;
}
}

module.exports = CommandsCleanerCommand;
54 changes: 54 additions & 0 deletions src/commands/common/operation-id-cleaner-command.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const Command = require('../command');
const constants = require('../../constants/constants');

/**
* Increases approval for Bidding contract on blockchain
*/
class OperationIdCleanerCommand extends Command {
constructor(ctx) {
super(ctx);
this.logger = ctx.logger;
this.repositoryModuleManager = ctx.repositoryModuleManager;
}

/**
* Executes command and produces one or more events
* @param command
*/
async execute() {
const timeToBeDeleted = Date.now() - constants.OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS;
await this.repositoryModuleManager.removeOperationIdRecord(timeToBeDeleted, [
constants.OPERATION_ID_STATUS.COMPLETED,
constants.OPERATION_ID_STATUS.FAILED,
]);
return Command.repeat();
}

/**
* Recover system from failure
* @param command
* @param error
*/
async recover(command, error) {
this.logger.warn(`Failed to clean operation ids table: error: ${error.message}`);
return Command.repeat();
}

/**
* Builds default command
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'operationIdCleanerCommand',
period: constants.OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
data: {},
transactional: false,
};
Object.assign(command, map);
return command;
}
}

module.exports = OperationIdCleanerCommand;
30 changes: 29 additions & 1 deletion src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ exports.OPERATION_IDS_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000;
/**
* @constant {Array} PERMANENT_COMMANDS - List of all permanent commands
*/
exports.PERMANENT_COMMANDS = ['otnodeUpdateCommand', 'sendTelemetryCommand'];
exports.PERMANENT_COMMANDS = [
'otnodeUpdateCommand',
'sendTelemetryCommand',
'operationIdCleanerCommand',
'commandsCleanerCommand',
];

/**
* @constant {number} MAX_COMMAND_DELAY_IN_MILLS - Maximum delay for commands
Expand Down Expand Up @@ -248,6 +253,29 @@ exports.OPERATIONS = {
SEARCH: 'search',
};

/**
* @constant {number} OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS -
* operation id command cleanup interval time 24h
*/
exports.OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000;
/**
* @constant {number} FINALIZED_COMMAND_CLEANUP_TIME_MILLS - Command cleanup interval time
* finalized commands command cleanup interval time 24h
*/
exports.FINALIZED_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000;
/**
* @constant {number} COMMAND_STATUS -
* Status for commands
*/
exports.COMMAND_STATUS = {
FAILED: 'FAILED',
EXPIRED: 'EXPIRED',
STARTED: 'STARTED',
PENDING: 'PENDING',
COMPLETED: 'COMPLETED',
REPEATING: 'REPEATING',
};

/**
* @constant {object} NETWORK_PROTOCOLS -
* Network protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ class SequelizeRepository {
});
}

async removeFinalizedCommands(finalizedStatuses) {
await this.models.commands.destroy({
where: {
status: { [Sequelize.Op.in]: finalizedStatuses },
started_at: { [Sequelize.Op.lte]: Date.now() },
},
});
}

// OPERATION_ID
async createOperationIdRecord(handlerData) {
const handlerRecord = await this.models.operation_ids.create(handlerData);
Expand All @@ -164,6 +173,15 @@ class SequelizeRepository {
});
}

async removeOperationIdRecord(timeToBeDeleted, statuses) {
await this.models.operation_ids.destroy({
where: {
timestamp: { [Sequelize.Op.lt]: timeToBeDeleted },
status: { [Sequelize.Op.in]: statuses },
},
});
}

async getNumberOfNodesFoundForPublish(publishId) {
return this.models.publish.findOne({
attributes: ['nodes_found'],
Expand Down
15 changes: 15 additions & 0 deletions src/modules/repository/repository-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class RepositoryModuleManager extends BaseModuleManager {
}
}

async removeFinalizedCommands(finalizedStatuses) {
if (this.initialized) {
return this.getImplementation().module.removeFinalizedCommands(finalizedStatuses);
}
}

// OPERATION ID TABLE
async createOperationIdRecord(handlerData) {
if (this.initialized) {
Expand All @@ -64,6 +70,15 @@ class RepositoryModuleManager extends BaseModuleManager {
}
}

async removeOperationIdRecord(timeToBeDeleted, statuses) {
if (this.initialized) {
return this.getImplementation().module.removeOperationIdRecord(
timeToBeDeleted,
statuses,
);
}
}

// publish table
async createOperationRecord(operation, operationId, status) {
if (this.initialized) {
Expand Down

0 comments on commit de88312

Please sign in to comment.