diff --git a/external/blazegraph-service.js b/external/blazegraph-service.js index c3a87dc327..0f50369493 100644 --- a/external/blazegraph-service.js +++ b/external/blazegraph-service.js @@ -164,7 +164,7 @@ class BlazegraphService { for (const bn of blankNodes) { const bnValue = Number(bn.substring(3)); bnName = `_:c14n${bnValue - minimumBlankNodeValue}`; - nquads[nquadIndex] = nquad.replace(bn, bnName); + nquads[nquadIndex] = nquads[nquadIndex].replace(bn, bnName); } } } diff --git a/modules/command/common/send-telemetry-command.js b/modules/command/common/send-telemetry-command.js index d831edebce..763e82fbf2 100644 --- a/modules/command/common/send-telemetry-command.js +++ b/modules/command/common/send-telemetry-command.js @@ -23,10 +23,11 @@ class SendTelemetryCommand extends Command { this.telemetryHubModuleManager.aggregateTelemetryData() .then((jsonld) => { if (jsonld) { + this.logger.restart(); Models.handler_ids.create({ status: 'PENDING', }).then((insertedObject) => { - this.publishService.publish(JSON.stringify(jsonld), '.json', [`ot-telemetry-${Math.floor(new Date() / (60 * 1000))}`], 'public', undefined, insertedObject.dataValues.handler_id); + this.publishService.publish(JSON.stringify(jsonld), '.json', [`ot-telemetry-${Math.floor(new Date() / (60 * 1000))}`], 'public', undefined, insertedObject.dataValues.handler_id, true); }); } }) diff --git a/modules/command/publish/submit-proofs-command.js b/modules/command/publish/submit-proofs-command.js index 08b699947c..7bf229a6b8 100644 --- a/modules/command/publish/submit-proofs-command.js +++ b/modules/command/publish/submit-proofs-command.js @@ -11,6 +11,8 @@ class SubmitProofsCommand extends Command { this.dataService = ctx.dataService; this.fileService = ctx.fileService; this.workerPool = ctx.workerPool; + + this.blockchainQueue = ctx.blockchainQueue.promise(this, this.sendTransaction, 1); } /** @@ -18,24 +20,23 @@ class SubmitProofsCommand extends Command { * @param command */ async execute(command) { - const { documentPath, handlerId, method } = command.data; + const { documentPath, handlerId, method, isUrgent } = command.data; try { let { nquads, assertion } = await this.fileService.loadJsonFromFile(documentPath); + this.logger.info(`Sending transaction to the blockchain`); let result; - switch (method) { - case 'publish': - result = await this.blockchainService.createAssertionRecord(assertion.id, assertion.rootHash, assertion.metadata.issuer); - break; - case 'provision': - result = await this.blockchainService.registerAsset(assertion.metadata.UALs[0],assertion.metadata.type,assertion.metadata.UALs[0],assertion.id, assertion.rootHash, 1); - break; - case 'update': - result = await this.blockchainService.updateAsset(assertion.metadata.UALs[0],assertion.id, assertion.rootHash); - break; + if (isUrgent){ + result = await this.blockchainQueue.unshift({method, assertion}); + }else { + if (this.blockchainQueue.length()>constants.BLOCKCHAIN_QUEUE_LIMIT){ + throw new Error ('Blockchain queue is full'); + } + result = await this.blockchainQueue.push({method, assertion}); } + const { transactionHash, blockchain } = result; this.logger.info(`Transaction hash is ${transactionHash} on ${blockchain}`); @@ -61,6 +62,24 @@ class SubmitProofsCommand extends Command { return this.continueSequence(command.data, command.sequence); } + async sendTransaction (args){ + const { assertion, method} = args; + let result; + switch (method) { + case 'publish': + result = await this.blockchainService.createAssertionRecord(assertion.id, assertion.rootHash, assertion.metadata.issuer); + break; + case 'provision': + result = await this.blockchainService.registerAsset(assertion.metadata.UALs[0],assertion.metadata.type,assertion.metadata.UALs[0],assertion.id, assertion.rootHash, 1); + break; + case 'update': + result = await this.blockchainService.updateAsset(assertion.metadata.UALs[0],assertion.id, assertion.rootHash); + break; + } + + return result; + } + /** * Recover system from failure * @param command diff --git a/modules/constants.js b/modules/constants.js index a4adc078d2..9816759655 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -63,6 +63,13 @@ exports.TRIPLE_STORE_CONNECT_MAX_RETRIES = 10; */ exports.TRIPLE_STORE_CONNECT_RETRY_FREQUENCY = 10; // 10 seconds + +/** + * @constant {number} BLOCKCHAIN_QUEUE_LIMIT + * - Blockchain queue limit + */ +exports.BLOCKCHAIN_QUEUE_LIMIT = 25000; + /** * @constant {object} TRIPLE_STORE_IMPLEMENTATION - * Names of available triple store implementations diff --git a/modules/logger/logger.js b/modules/logger/logger.js index 9bc1e9b165..e55bc3ebd6 100644 --- a/modules/logger/logger.js +++ b/modules/logger/logger.js @@ -3,6 +3,11 @@ const path = require('path'); class Logger { constructor(logLevel = 'trace', telemetryHubEnabled) { + this.logLevel = logLevel; + this.initialize(logLevel, telemetryHubEnabled); + } + + initialize(logLevel, telemetryHubEnabled){ try { const logFilename = path.join(path.resolve(__dirname, '../../'), 'logs/active.log'); let chosenTargets = []; @@ -31,6 +36,10 @@ class Logger { } } + restart(){ + this.initialize(this.logLevel, true); + } + fatal(obj) { this.pinoLogger.fatal(obj); } diff --git a/modules/service/publish-service.js b/modules/service/publish-service.js index e8ee3076bc..ec7fbb99cf 100644 --- a/modules/service/publish-service.js +++ b/modules/service/publish-service.js @@ -12,7 +12,7 @@ class PublishService { this.workerPool = ctx.workerPool; } - async publish(fileContent, fileExtension, keywords, visibility, ual, handlerId) { + async publish(fileContent, fileExtension, keywords, visibility, ual, handlerId, isUrgent = false) { let { assertion, nquads, @@ -70,7 +70,7 @@ class PublishService { sequence: commandSequence.slice(1), delay: 0, data: { - documentPath, handlerId, method + documentPath, handlerId, method, isUrgent }, transactional: false, }); diff --git a/ot-node.js b/ot-node.js index 2b431546b8..9d5dec8af7 100644 --- a/ot-node.js +++ b/ot-node.js @@ -9,6 +9,8 @@ const constants = require('./modules/constants'); const db = require('./models'); const pjson = require('./package.json'); const configjson = require('./config/config.json'); +const queue = require('fastq') + class OTNode { constructor(config) { @@ -64,6 +66,7 @@ class OTNode { DependencyInjection.registerValue(this.container, 'config', this.config); DependencyInjection.registerValue(this.container, 'logger', this.logger); DependencyInjection.registerValue(this.container, 'constants', constants); + DependencyInjection.registerValue(this.container, 'blockchainQueue', queue); this.logger.info('Dependency injection module is initialized'); } diff --git a/package.json b/package.json index cf11edd72c..5f805f92f6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.0.0-beta.1.24", + "version": "6.0.0-beta.1.25", "description": "OTNode v6 Beta 1", "main": "index.js", "scripts": { @@ -55,6 +55,7 @@ "express-fileupload": "^1.2.1", "express-ipfilter": "^1.2.0", "fast-sort": "^3.1.1", + "fastq": "^1.13.0", "fs-extra": "^10.0.0", "graphdb": "^2.0.0", "it-concat": "^2.0.0", @@ -75,7 +76,7 @@ "multiformats": "^9.4.7", "mysql2": "^2.3.3", "n3": "^1.12.2", - "ot-telemetry-collector": "^1.0.4", + "ot-telemetry-collector": "^1.0.6", "p-iteration": "^1.1.8", "peer-id": "^0.15.3", "pino": "^7.5.1",