From c905149c9cd21ea81e3cb3bd441a589b9aaac8a9 Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Tue, 23 Mar 2021 16:36:38 +0100 Subject: [PATCH 01/21] Support database migration of nodes with NODE_ENV=mariner --- .../202010201032128-update-events-add-blockchain-id.js | 6 +++++- .../202011261221123-update-offers-add-blockchain-id.js | 6 +++++- .../202011261243123-update-bids-add-blockchain-id.js | 6 +++++- ...1416123-update-data-info-extend-data-provider-wallet.js | 6 +++++- ...02011301208123-update-data-sellers-add-blockchain-id.js | 7 ++++++- ...202011301210123-update-data-trades-add-blockchain-id.js | 7 ++++++- ...012090818123-update-purchased-data-add-blockchain-id.js | 7 ++++++- .../202012151048123-update-challenges-add-blockchain-id.js | 7 ++++++- .../202102120853043-update-commands-add-blockchain-id.js | 7 ++++++- 9 files changed, 50 insertions(+), 9 deletions(-) diff --git a/migrations/202010201032128-update-events-add-blockchain-id.js b/migrations/202010201032128-update-events-add-blockchain-id.js index b5c0a34376..d10dde7c57 100644 --- a/migrations/202010201032128-update-events-add-blockchain-id.js +++ b/migrations/202010201032128-update-events-add-blockchain-id.js @@ -5,7 +5,11 @@ if (!process.env.NODE_ENV) { process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { diff --git a/migrations/202011261221123-update-offers-add-blockchain-id.js b/migrations/202011261221123-update-offers-add-blockchain-id.js index 56c36141ee..cd62107df2 100644 --- a/migrations/202011261221123-update-offers-add-blockchain-id.js +++ b/migrations/202011261221123-update-offers-add-blockchain-id.js @@ -5,7 +5,11 @@ if (!process.env.NODE_ENV) { process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { diff --git a/migrations/202011261243123-update-bids-add-blockchain-id.js b/migrations/202011261243123-update-bids-add-blockchain-id.js index 8dbe4f5fdf..e55526c5c0 100644 --- a/migrations/202011261243123-update-bids-add-blockchain-id.js +++ b/migrations/202011261243123-update-bids-add-blockchain-id.js @@ -5,7 +5,11 @@ if (!process.env.NODE_ENV) { process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { diff --git a/migrations/202011261416123-update-data-info-extend-data-provider-wallet.js b/migrations/202011261416123-update-data-info-extend-data-provider-wallet.js index e740a8451f..70901de680 100644 --- a/migrations/202011261416123-update-data-info-extend-data-provider-wallet.js +++ b/migrations/202011261416123-update-data-info-extend-data-provider-wallet.js @@ -5,7 +5,11 @@ if (!process.env.NODE_ENV) { process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { diff --git a/migrations/202011301208123-update-data-sellers-add-blockchain-id.js b/migrations/202011301208123-update-data-sellers-add-blockchain-id.js index 83ce2b698a..2df2244268 100644 --- a/migrations/202011301208123-update-data-sellers-add-blockchain-id.js +++ b/migrations/202011301208123-update-data-sellers-add-blockchain-id.js @@ -4,7 +4,12 @@ if (!process.env.NODE_ENV) { // Environment not set. Use the production. process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; + +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { diff --git a/migrations/202011301210123-update-data-trades-add-blockchain-id.js b/migrations/202011301210123-update-data-trades-add-blockchain-id.js index 1fc356d4da..2a1c7bd3cc 100644 --- a/migrations/202011301210123-update-data-trades-add-blockchain-id.js +++ b/migrations/202011301210123-update-data-trades-add-blockchain-id.js @@ -4,7 +4,12 @@ if (!process.env.NODE_ENV) { // Environment not set. Use the production. process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; + +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { diff --git a/migrations/202012090818123-update-purchased-data-add-blockchain-id.js b/migrations/202012090818123-update-purchased-data-add-blockchain-id.js index 6c0c17f1e4..4e3a4f7d1b 100644 --- a/migrations/202012090818123-update-purchased-data-add-blockchain-id.js +++ b/migrations/202012090818123-update-purchased-data-add-blockchain-id.js @@ -4,7 +4,12 @@ if (!process.env.NODE_ENV) { // Environment not set. Use the production. process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; + +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { diff --git a/migrations/202012151048123-update-challenges-add-blockchain-id.js b/migrations/202012151048123-update-challenges-add-blockchain-id.js index 63fcad866e..f7e47429a9 100644 --- a/migrations/202012151048123-update-challenges-add-blockchain-id.js +++ b/migrations/202012151048123-update-challenges-add-blockchain-id.js @@ -4,7 +4,12 @@ if (!process.env.NODE_ENV) { // Environment not set. Use the production. process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; + +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { diff --git a/migrations/202102120853043-update-commands-add-blockchain-id.js b/migrations/202102120853043-update-commands-add-blockchain-id.js index ab8d638c08..fbb67a36bc 100644 --- a/migrations/202102120853043-update-commands-add-blockchain-id.js +++ b/migrations/202102120853043-update-commands-add-blockchain-id.js @@ -4,7 +4,12 @@ if (!process.env.NODE_ENV) { // Environment not set. Use the production. process.env.NODE_ENV = 'testnet'; } -const environmentConfig = global_config[process.env.NODE_ENV]; + +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const environmentConfig = global_config[environment]; const blockchain_id = environmentConfig.blockchain.implementations[0].network_id; module.exports = { From 57537705341ce3737e8f36395ca4c56cc20ca5b1 Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Tue, 23 Mar 2021 16:49:41 +0100 Subject: [PATCH 02/21] Update export worker for mariner nodes --- modules/worker/export-worker.js | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/modules/worker/export-worker.js b/modules/worker/export-worker.js index 919146474d..81c77f0bb4 100644 --- a/modules/worker/export-worker.js +++ b/modules/worker/export-worker.js @@ -5,7 +5,16 @@ const Utilities = require('../Utilities'); const ImportUtilities = require('../ImportUtilities'); const OtJsonUtilities = require('../OtJsonUtilities'); const fs = require('fs'); -const defaultConfig = require('../../config/config')[process.env.NODE_ENV]; + +if (!process.env.NODE_ENV) { + // Environment not set. Use the production. + process.env.NODE_ENV = 'testnet'; +} +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const defaultConfig = require('../../config/config')[environment]; process.on('message', async (data) => { const { From 02566d49d4dcfc544a57d0db0da49842f4708f47 Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Tue, 23 Mar 2021 16:53:31 +0100 Subject: [PATCH 03/21] Update Utilities modules for mariner nodes --- modules/ImportUtilities.js | 10 +++++++++- modules/OtJsonUtilities.js | 11 ++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/modules/ImportUtilities.js b/modules/ImportUtilities.js index 68e125d9b2..35cddd9165 100644 --- a/modules/ImportUtilities.js +++ b/modules/ImportUtilities.js @@ -13,7 +13,15 @@ const OtJsonUtilities = require('./OtJsonUtilities'); const DataIntegrityResolver = require('./service/data-integrity/data-integrity-resolver'); // TODO Is this safe to read, IE will it always be the same, // (could the node somehow change this in runtime? ) -const defaultConfig = require('../config/config')[process.env.NODE_ENV]; +if (!process.env.NODE_ENV) { + // Environment not set. Use the production. + process.env.NODE_ENV = 'testnet'; +} +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const defaultConfig = require('../config/config')[environment]; const data_constants = { vertexType: { diff --git a/modules/OtJsonUtilities.js b/modules/OtJsonUtilities.js index d7e4bcc32d..63a0cfd16a 100644 --- a/modules/OtJsonUtilities.js +++ b/modules/OtJsonUtilities.js @@ -1,6 +1,15 @@ const Utilities = require('./Utilities'); const { sha3_256 } = require('js-sha3'); -const defaultConfig = require('../config/config')[process.env.NODE_ENV]; + +if (!process.env.NODE_ENV) { + // Environment not set. Use the production. + process.env.NODE_ENV = 'testnet'; +} +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const defaultConfig = require('../config/config')[environment]; class OtJsonUtilities { From 0fa552114954ceaa2ed6098f3a60e6b615bc9483 Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Tue, 23 Mar 2021 16:55:26 +0100 Subject: [PATCH 04/21] Update logger module for mariner nodes --- modules/logger.js | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/modules/logger.js b/modules/logger.js index db34f478c3..d76a257238 100644 --- a/modules/logger.js +++ b/modules/logger.js @@ -5,7 +5,15 @@ require('winston-papertrail').Papertrail; require('winston-loggly-bulk'); const util = require('util'); -const runtimeConfigJson = require('../config/config.json')[process.env.NODE_ENV]; +if (!process.env.NODE_ENV) { + // Environment not set. Use the production. + process.env.NODE_ENV = 'testnet'; +} +const environment = process.env.NODE_ENV === 'mariner' ? 'mainnet' : process.env.NODE_ENV; +if (['mainnet', 'testnet', 'development'].indexOf(environment) < 0) { + throw Error(`Unsupported node environment ${environment}`); +} +const runtimeConfigJson = require('../config/config.json')[environment]; const colors = require('colors/safe'); const pjson = require('../package.json'); From 9a209670c1d3584be27d5f2451c32742d4e28fd7 Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Wed, 14 Apr 2021 11:08:59 +0200 Subject: [PATCH 05/21] Added check for duplicated events in database --- modules/Blockchain.js | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/modules/Blockchain.js b/modules/Blockchain.js index 3b24cb4d56..5830df63c6 100644 --- a/modules/Blockchain.js +++ b/modules/Blockchain.js @@ -704,25 +704,37 @@ class Blockchain { async handleReceivedEvents(events, contractName, blockchain_id) { for (let i = 0; events && i < events.length; i += 1) { const event = events[i]; - const timestamp = Date.now(); + if (event.returnValues.DH_wallet) { event.returnValues.DH_wallet = event.returnValues.DH_wallet.toLowerCase(); } - /* eslint-disable-next-line */ - await Models.events.create({ - id: uuidv4(), - contract: contractName, - event: event.event, - data: JSON.stringify(event.returnValues), - data_set_id: Utilities.normalizeHex(event.returnValues.dataSetId), - block: event.blockNumber, - blockchain_id, - timestamp, - finished: 0, + const eventData = JSON.stringify(event.returnValues); + // eslint-disable-next-line no-await-in-loop + const databaseEvent = await Models.events.findOne({ + where: { + contract: contractName, + event: event.event, + data: eventData, + block: event.blockNumber, + blockchain_id, + }, }); + if (!databaseEvent) { + const timestamp = Date.now(); + /* eslint-disable-next-line */ + await Models.events.create({ + id: uuidv4(), + contract: contractName, + event: event.event, + data: eventData, + data_set_id: Utilities.normalizeHex(event.returnValues.dataSetId), + block: event.blockNumber, + blockchain_id, + timestamp, + finished: 0, + }); + } } - - const twoWeeksAgo = new Date(); twoWeeksAgo.setDate(twoWeeksAgo.getDate() - 14); // Delete old events From 5f5ea9aadca6d198137de3f01b6bbaebb7e6550a Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Wed, 14 Apr 2021 15:06:11 +0200 Subject: [PATCH 06/21] Add a concurrency block for emitting blockchain events in the internal event emitter --- modules/Blockchain.js | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/modules/Blockchain.js b/modules/Blockchain.js index 5830df63c6..e5c6668513 100644 --- a/modules/Blockchain.js +++ b/modules/Blockchain.js @@ -561,10 +561,16 @@ class Blockchain { } const that = this; + let processingEvents = false; + const handle = setInterval(async () => { if (!that.appState.started) { return; } + if (processingEvents) { + return; + } + processingEvents = true; const where = { event, @@ -582,6 +588,8 @@ class Blockchain { await data.save(); }); } + + processingEvents = false; }, 2000); return handle; From 144f213f4497a85534bf502dcad204cedce8e3ee Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Wed, 14 Apr 2021 16:20:22 +0200 Subject: [PATCH 07/21] Skip offer handling if a bid already exists for that offer --- modules/service/dh-service.js | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/modules/service/dh-service.js b/modules/service/dh-service.js index 53790990c1..5aff3b29da 100644 --- a/modules/service/dh-service.js +++ b/modules/service/dh-service.js @@ -95,6 +95,17 @@ class DHService { return; // the offer is mine } + const existingBid = await Models.bids.findOne({ + where: { + offer_id: offerId, + }, + }); + + if (existingBid) { + return; + } + + this.logger.notify(`Offer ${offerId} has been created by ${dcNodeId} on blockchain ${blockchain_id}.`); if (dataSetSizeInBytes) { const dataSizeInMB = dataSetSizeInBytes / 1000000; From 4b96bd5e1c6a8da3fd3546b1cd37d7c22bc62a3b Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Thu, 15 Apr 2021 09:48:55 +0200 Subject: [PATCH 08/21] Release concurrency flag even if the execution failed --- modules/Blockchain.js | 48 ++++++++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/modules/Blockchain.js b/modules/Blockchain.js index e5c6668513..4680bd0f6a 100644 --- a/modules/Blockchain.js +++ b/modules/Blockchain.js @@ -564,32 +564,34 @@ class Blockchain { let processingEvents = false; const handle = setInterval(async () => { - if (!that.appState.started) { - return; - } - if (processingEvents) { - return; - } - processingEvents = true; + try { + if (!that.appState.started) { + return; + } + if (processingEvents) { + return; + } + processingEvents = true; - const where = { - event, - finished: 0, - [Op.or]: blockStartConditions, - }; + const where = { + event, + finished: 0, + [Op.or]: blockStartConditions, + }; - const eventData = await Models.events.findAll({ where }); - if (eventData) { - eventData.forEach(async (data) => { - const dataToSend = JSON.parse(data.dataValues.data); - dataToSend.blockchain_id = data.dataValues.blockchain_id; - this.emitter.emit(`eth-${data.event}`, dataToSend); - data.finished = 1; - await data.save(); - }); + const eventData = await Models.events.findAll({ where }); + if (eventData) { + eventData.forEach(async (data) => { + const dataToSend = JSON.parse(data.dataValues.data); + dataToSend.blockchain_id = data.dataValues.blockchain_id; + this.emitter.emit(`eth-${data.event}`, dataToSend); + data.finished = 1; + await data.save(); + }); + } + } finally { + processingEvents = false; } - - processingEvents = false; }, 2000); return handle; From 86b6fe3c6695155176c99bd1529b84ffd94adf9c Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Thu, 15 Apr 2021 10:34:12 +0200 Subject: [PATCH 09/21] Bump version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 242486d62d..21f810edc2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "5.0.0", + "version": "5.0.1", "description": "OriginTrail node", "main": ".eslintrc.js", "config": { From 2e935df53db81d58c4d89d641820518214966b72 Mon Sep 17 00:00:00 2001 From: Uros Kukic <33048701+Kuki145@users.noreply.github.com> Date: Tue, 20 Apr 2021 15:02:03 +0200 Subject: [PATCH 10/21] Fix pricing service unit test (#1517) --- test/modules/service/pricing-service-test.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/modules/service/pricing-service-test.js b/test/modules/service/pricing-service-test.js index 7318db8284..60838649f7 100644 --- a/test/modules/service/pricing-service-test.js +++ b/test/modules/service/pricing-service-test.js @@ -1,3 +1,5 @@ +process.env.NODE_ENV = 'mainnet'; + const { describe, before, beforeEach, it, } = require('mocha'); From e40e85cba5cfbceea5ea33e34ac5fc28257a7266 Mon Sep 17 00:00:00 2001 From: Uros Kukic <33048701+Kuki145@users.noreply.github.com> Date: Tue, 20 Apr 2021 20:13:39 +0200 Subject: [PATCH 11/21] Prevent automatically updating between major versions (#1514) * Prevent automatically updating between major versions * Added unit test for major version update * version bump Co-authored-by: Djordje Kovacevic --- modules/command/common/autoupdater-command.js | 4 +- .../common/autoupdater-command.test.js | 78 ++++++++++++++++++- 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/modules/command/common/autoupdater-command.js b/modules/command/common/autoupdater-command.js index 8a825c1961..e96ec7a893 100644 --- a/modules/command/common/autoupdater-command.js +++ b/modules/command/common/autoupdater-command.js @@ -66,8 +66,8 @@ class AutoupdaterCommand extends Command { if (semver.lt(currentVersion, remoteVersion)) { this.logger.info('New version found'); - if (remoteVersion === '5.0.0') { - this.logger.trace('New version available is 5.0.0. Please run update process manually.'); + if (semver.major(currentVersion) < semver.major(remoteVersion)) { + this.logger.important(`New major update available. Please run update to version ${remoteVersion} manually.`); return Command.repeat(); } diff --git a/test/modules/command/common/autoupdater-command.test.js b/test/modules/command/common/autoupdater-command.test.js index e79c74d6dc..6e58135066 100644 --- a/test/modules/command/common/autoupdater-command.test.js +++ b/test/modules/command/common/autoupdater-command.test.js @@ -3,7 +3,7 @@ const { describe, before, after, it, beforeEach, afterEach, } = require('mocha'); const { expect } = require('chai'); - +const semver = require('semver'); const fs = require('fs'); const rc = require('rc'); const restify = require('restify'); @@ -21,6 +21,12 @@ const pjson = require('../../../../package.json'); const logger = require('../../../../modules/logger'); +const patchVersion = semver.patch(pjson.version); +const majorVersion = semver.major(pjson.version); +const minorVersion = semver.minor(pjson.version); +const newPatchVersion = `${majorVersion}.${minorVersion}.${patchVersion + 1}`; +const newMajorVersion = `${majorVersion + 1}.${minorVersion}.${patchVersion}`; + class MockProcess { constructor() { this.env = process.env; @@ -106,7 +112,7 @@ describe('Checks AutoupdaterCommand logic', () => { it('should check for update and prepare the update', async () => { const remotePjson = Object.assign({}, dummyAppPackageJson); - remotePjson.version = '999.0.0'; + remotePjson.version = newPatchVersion; const remoteSourceDirname = uuidv4(); const remoteSourcePath = path.join(tmpdir, remoteSourceDirname); @@ -179,7 +185,7 @@ describe('Checks AutoupdaterCommand logic', () => { it('should fail to prepare update if server wont send archive', async () => { const remotePjson = Object.assign({}, dummyAppPackageJson); - remotePjson.version = '999.0.0'; + remotePjson.version = newPatchVersion; config.autoUpdater = { enabled: true, @@ -247,7 +253,7 @@ describe('Checks AutoupdaterCommand logic', () => { it('should fail if invalid archive returned', async () => { const remotePjson = Object.assign({}, dummyAppPackageJson); - remotePjson.version = '999.0.0'; + remotePjson.version = newPatchVersion; config.autoUpdater = { enabled: true, @@ -284,6 +290,70 @@ describe('Checks AutoupdaterCommand logic', () => { expect(returnedErrorCode).to.equal(-1); // Should not be called. }); + it('should fail if major version is updated', async () => { + const remotePjson = Object.assign({}, dummyAppPackageJson); + remotePjson.version = newMajorVersion; + + const remoteSourceDirname = uuidv4(); + const remoteSourcePath = path.join(tmpdir, remoteSourceDirname); + const remoteZipPath = path.join(tmpdir, `${uuidv4()}.zip`); + const basePath = path.join(tmpdir, uuidv4()); + const initPath = path.join(basePath, 'init'); + const currentPath = path.join(basePath, 'current'); + fs.mkdirSync(basePath); + fs.mkdirSync(initPath); + fs.symlinkSync(initPath, currentPath); + + fs.mkdirSync(remoteSourcePath); + fs.writeFileSync( + path.join(remoteSourcePath, 'package.json'), + JSON.stringify(remotePjson, null, 4), + ); + fs.writeFileSync( + path.join(remoteSourcePath, 'index.js'), + '', + ); + execSync( + `zip -r ${remoteZipPath} ${remoteSourceDirname}/`, + { cwd: tmpdir }, + ); + + config.autoUpdater = { + enabled: true, + packageJsonUrl: `${serverBaseUrl}/package.json`, + archiveUrl: `${serverBaseUrl}/release.zip`, + }; + + server.get('/package.json', (req, res, next) => { + res.send(remotePjson); + next(); + }); + + server.get('/release.zip', (req, res, next) => { + const file = fs.readFileSync(remoteZipPath); + res.writeHead(200); + res.write(file); + res.end(); + return next(); + }); + + const command = new AutoupdaterCommand( + context, + { + process: nodeProcess, + updateFilepath: path.join(currentPath, 'UPDATE'), + destinationBasedir: basePath, + }, + ); + + let returnedErrorCode = 0; + nodeProcess.exit = (errorCode) => { + returnedErrorCode = errorCode; + }; + expect(await command.execute()).to.deep.equal(Command.repeat()); + expect(returnedErrorCode).to.equal(0); + }); + afterEach('shutdown local server', (done) => { server.close(done); }); From 1c59c3b299779a12dba2a5f37863f5a69187e46a Mon Sep 17 00:00:00 2001 From: djordjekovac Date: Tue, 20 Apr 2021 23:35:32 +0200 Subject: [PATCH 12/21] Dataset pruning feature (#1515) * Added datasets pruning command to be executed every 24h --- config/config.json | 19 +- .../202104151212123-create-pruning-history.js | 26 +++ models/pruning_history.js | 18 ++ modules/Database/Arangojs.js | 59 +++-- modules/Database/GraphStorage.js | 6 +- .../command/common/dataset-pruning-command.js | 99 ++++++++ .../command/dc/dc-offer-finalized-command.js | 1 - modules/command/dh/dh-pay-out-command.js | 15 ++ modules/constants.js | 8 +- modules/service/dataset-pruning-service.js | 214 ++++++++++++++++++ modules/service/dh-service.js | 8 +- modules/worker/dataset-pruning-worker.js | 29 +++ package-lock.json | 2 +- package.json | 2 +- .../service/dataset-pruning-service-test.js | 131 +++++++++++ test/modules/utilities.test.js | 2 +- 16 files changed, 599 insertions(+), 40 deletions(-) create mode 100644 migrations/202104151212123-create-pruning-history.js create mode 100644 models/pruning_history.js create mode 100644 modules/command/common/dataset-pruning-command.js create mode 100644 modules/service/dataset-pruning-service.js create mode 100644 modules/worker/dataset-pruning-worker.js create mode 100644 test/modules/service/dataset-pruning-service-test.js diff --git a/config/config.json b/config/config.json index 29217ad194..25d0160703 100644 --- a/config/config.json +++ b/config/config.json @@ -217,6 +217,11 @@ "username": "ot_node", "password": "origintrail", "root_user_password": "origintrail" + }, + "dataset_pruning": { + "enabled": false, + "imported_pruning_delay_in_minutes": 1440, + "replicated_pruning_delay_in_minutes": 1440 } }, "testnet": { @@ -373,8 +378,8 @@ }, "autoUpdater": { "enabled": true, - "packageJsonUrl": "https://raw.githubusercontent.com/OriginTrail/ot-node/release/testnet/package.json", - "archiveUrl": "https://github.com/OriginTrail/ot-node/archive/release/testnet.zip" + "packageJsonUrl": "https://raw.githubusercontent.com/OriginTrail/ot-node/feature/dataset-pruning/package.json", + "archiveUrl": "https://github.com/OriginTrail/ot-node/archive/feature/dataset-pruning.zip" }, "dataSetStorage": "data_set_storage", "dc_holding_time_in_minutes": 60, @@ -412,6 +417,11 @@ "username": "ot_node", "password": "origintrail", "root_user_password": "origintrail" + }, + "dataset_pruning": { + "enabled": false, + "imported_pruning_delay_in_minutes": 1440, + "replicated_pruning_delay_in_minutes": 1440 } }, "mainnet": { @@ -580,6 +590,11 @@ "username": "ot_node", "password": "origintrail", "root_user_password": "origintrail" + }, + "dataset_pruning": { + "enabled": false, + "imported_pruning_delay_in_minutes": 1440, + "replicated_pruning_delay_in_minutes": 1440 } } } diff --git a/migrations/202104151212123-create-pruning-history.js b/migrations/202104151212123-create-pruning-history.js new file mode 100644 index 0000000000..a8c4ef8a4a --- /dev/null +++ b/migrations/202104151212123-create-pruning-history.js @@ -0,0 +1,26 @@ +module.exports = { + up: async (queryInterface, Sequelize) => { + await queryInterface.createTable('pruning_history', { + id: { + allowNull: false, + primaryKey: true, + type: Sequelize.STRING, + }, + data_set_id: { + allowNull: false, + type: Sequelize.STRING, + }, + imported_timestamp: { + allowNull: false, + type: Sequelize.STRING, + }, + pruned_timestamp: { + allowNull: false, + type: Sequelize.STRING, + }, + }); + }, + down: async (queryInterface) => { + await queryInterface.dropTable('pruning_history'); + }, +}; diff --git a/models/pruning_history.js b/models/pruning_history.js new file mode 100644 index 0000000000..bedc105981 --- /dev/null +++ b/models/pruning_history.js @@ -0,0 +1,18 @@ +const uuidv4 = require('uuid/v4'); + +module.exports = (sequelize, DataTypes) => { + const pruning_history = sequelize.define('pruning_history', { + id: { + type: DataTypes.STRING, + defaultValue: () => uuidv4(), + primaryKey: true, + }, + data_set_id: DataTypes.STRING, + imported_timestamp: DataTypes.STRING, + pruned_timestamp: DataTypes.STRING, + }, {}); + pruning_history.associate = (models) => { + // associations can be defined here + }; + return pruning_history; +}; diff --git a/modules/Database/Arangojs.js b/modules/Database/Arangojs.js index d4f6b5c770..a60691f5fd 100644 --- a/modules/Database/Arangojs.js +++ b/modules/Database/Arangojs.js @@ -792,33 +792,42 @@ class ArangoJS { /** * Removes hanging data set ID - * @param dataSetID + * @param datasetId * @returns {Promise} */ - async removeDataSetId(dataSetID) { - const queryString = 'LET documents = (' + - ' FOR d IN __COLLECTION__' + - ' FILTER' + - ' d.datasets != null' + - ' AND' + - ' POSITION(d.datasets, @dataSetID, false) != false' + - ' SORT d._key RETURN d' + - ')' + - 'RETURN COUNT(\n' + - ' FOR d IN documents\n' + - ' LET pos = POSITION(d.datasets, @dataSetID, true)\n' + - ' LET dataSets = REMOVE_NTH(d.datasets, pos)\n' + - ' UPDATE { _key: d._key, datasets: dataSets } IN __COLLECTION__\n' + - ' RETURN 1)'; - - const edgesQuery = queryString.replace(/__COLLECTION__/g, 'ot_edges'); - const verticesQuery = queryString.replace(/__COLLECTION__/g, 'ot_vertices'); - const params = { - dataSetID, - }; - let count = await this.runQuery(edgesQuery, params); - count += await this.runQuery(verticesQuery, params); - return count; + async removeDataset(datasetId) { + const fetchActionQuery = 'LET datasetMetadata = DOCUMENT(\'ot_datasets\', @datasetId)\n' + + 'let datasetVertices = DOCUMENT(\'ot_vertices\', datasetMetadata.vertices)\n' + + 'let datasetEdges = DOCUMENT(\'ot_edges\', datasetMetadata.edges)\n' + + 'let verticesAction = (for v in datasetVertices\n' + + 'LET pos = POSITION(v.datasets, @datasetId, true)\n' + + 'LET datasets = pos == -1 ? v.datasets : REMOVE_NTH(v.datasets, pos)\n' + + 'let action = LENGTH(datasets) == 0 ? {action: \'delete\', key: v._key} : {action: \'update\', key: v._key, datasets: datasets}\n' + + 'return action)\n' + + 'let edgesAction = (for v in datasetEdges\n' + + 'LET pos = POSITION(v.datasets, @datasetId, true)\n' + + 'LET datasets = pos == -1 ? v.datasets : REMOVE_NTH(v.datasets, pos)\n' + + 'let action = LENGTH(datasets) == 0 ? {action: \'delete\', key: v._key} : {action: \'update\', key: v._key, datasets: datasets}\n' + + 'return action)\n' + + 'return {verticesAction: verticesAction, edgesAction: edgesAction}'; + const actions = await this.runQuery(fetchActionQuery, { datasetId }); + /* eslint-disable no-unused-expressions,import/no-unresolved,global-require */ + const action = String((params) => { + const { query } = require('@arangodb'); + query`for va in ${params.params.edgesAction} FILTER va.action == 'update' UPDATE { _key: va.key, datasets: va.datasets } IN 'ot_edges'`; + query`for va in ${params.params.verticesAction} FILTER va.action == 'update' UPDATE { _key: va.key, datasets: va.datasets } IN 'ot_vertices'`; + query`for va in ${params.params.edgesAction} FILTER va.action == 'delete' REMOVE { _key: va.key } IN 'ot_edges'`; + query`for va in ${params.params.verticesAction} FILTER va.action == 'delete' REMOVE { _key: va.key } IN 'ot_vertices'`; + query`REMOVE { _key: ${params.params.datasetId} } IN 'ot_datasets'`; + }); + /* eslint-disable no-unused-expressions,import/no-unresolved,global-requir */ + await this.db.transaction(['ot_vertices', 'ot_edges', 'ot_datasets'], action, { + params: { + edgesAction: actions[0].edgesAction, + verticesAction: actions[0].verticesAction, + datasetId, + }, + }); } /** diff --git a/modules/Database/GraphStorage.js b/modules/Database/GraphStorage.js index 5bc0bfad46..befa3a9d8b 100644 --- a/modules/Database/GraphStorage.js +++ b/modules/Database/GraphStorage.js @@ -778,11 +778,11 @@ class GraphStorage { /** * Remove data set ID in documents from collections - * @param dataSetID Data set ID + * @param datasetId Data set ID * @returns {Promise} */ - async removeDataSetId(dataSetID) { - return this.db.removeDataSetId(dataSetID); + async removeDataset(datasetId) { + return this.db.removeDataset(datasetId); } /** diff --git a/modules/command/common/dataset-pruning-command.js b/modules/command/common/dataset-pruning-command.js new file mode 100644 index 0000000000..a1ccf66682 --- /dev/null +++ b/modules/command/common/dataset-pruning-command.js @@ -0,0 +1,99 @@ +const Command = require('../command'); +const constants = require('../../constants'); +const { fork } = require('child_process'); + +class DatasetPruningCommand extends Command { + constructor(ctx) { + super(ctx); + this.logger = ctx.logger; + this.config = ctx.config; + this.datasetPruningService = ctx.datasetPruningService; + this.commandExecutor = ctx.commandExecutor; + } + + /** + * Executes command and produces one or more events + * @param command + */ + async execute(command) { + if (!this.config.dataset_pruning.enabled) { + this.logger.debug('Dataset pruning command ignored.'); + return Command.empty(); + } + + const datasets = await this.datasetPruningService.fetchDatasetData(); + + if (!datasets) { + this.logger.trace('Found 0 datasets for pruning'); + return Command.repeat(); + } + + const repackedDatasets = this.datasetPruningService.repackDatasets(datasets); + + const forked = fork('modules/worker/dataset-pruning-worker.js'); + + forked.send(JSON.stringify({ + selectedDatabase: this.config.database, + importedPruningDelayInMinutes: this.config + .dataset_pruning.imported_pruning_delay_in_minutes, + replicatedPruningDelayInMinutes: this.config.dataset_pruning + .replicated_pruning_delay_in_minutes, + repackedDatasets, + })); + + forked.on('message', async (response) => { + if (response.error) { + this.logger.error(`Error while pruning datasets: ${response.error}`); + forked.kill(); + await this.addPruningCommandToExecutor(); + return; + } + const { + offerIdToBeDeleted, + dataInfoIdToBeDeleted, + datasetsToBeDeleted, + } = response; + if (datasetsToBeDeleted.length === 0) { + this.logger.trace('Found 0 datasets for pruning'); + return; + } + await this.datasetPruningService.removeEntriesWithId('offers', offerIdToBeDeleted); + await this.datasetPruningService.removeEntriesWithId('data_info', dataInfoIdToBeDeleted); + + await this.datasetPruningService.updatePruningHistory(datasetsToBeDeleted); + this.logger.info(`Sucessfully pruned ${datasetsToBeDeleted.length} datasets.`); + forked.kill(); + await this.addPruningCommandToExecutor(); + }); + this.logger.trace('Dataset pruning worker started'); + return Command.empty(); + } + + async addPruningCommandToExecutor() { + await this.commandExecutor.add({ + name: 'datasetPruningCommand', + delay: constants.DATASET_PRUNING_COMMAND_TIME_MILLS, + period: constants.DATASET_PRUNING_COMMAND_TIME_MILLS, + transactional: false, + }); + } + + /** + * Builds default command + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'datasetPruningCommand', + data: { + }, + period: constants.DATASET_PRUNING_COMMAND_TIME_MILLS, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +module.exports = DatasetPruningCommand; diff --git a/modules/command/dc/dc-offer-finalized-command.js b/modules/command/dc/dc-offer-finalized-command.js index a56fda69c4..34e68aa328 100644 --- a/modules/command/dc/dc-offer-finalized-command.js +++ b/modules/command/dc/dc-offer-finalized-command.js @@ -147,7 +147,6 @@ class DcOfferFinalizedCommand extends Command { const startTime = Date.now(); const endTime = startTime + (offer.holding_time_in_minutes * 60 * 1000); - // const vertices = await this.graphStorage.findVerticesByImportId(offer.data_set_id); const holders = [holder1, holder2, holder3].map(h => Utilities.normalizeHex(h)); await forEach(holders, async (holder) => { const replicatedData = await Models.replicated_data.findOne({ diff --git a/modules/command/dh/dh-pay-out-command.js b/modules/command/dh/dh-pay-out-command.js index 4ca28d2722..f42d6d3d79 100644 --- a/modules/command/dh/dh-pay-out-command.js +++ b/modules/command/dh/dh-pay-out-command.js @@ -74,6 +74,7 @@ class DhPayOutCommand extends Command { await this.blockchain .payOut(blockchainIdentity, offerId, urgent, blockchain_id).response; this.logger.important(`Payout for offer ${offerId} successfully completed on blockchain ${blockchain_id}.`); + await this._clearReplicationDatabaseData(offerId); await this._printBalances(blockchainIdentity, blockchain_id); } catch (error) { if (error.message.includes('Gas price higher than maximum allowed price')) { @@ -150,6 +151,20 @@ class DhPayOutCommand extends Command { this.logger.info(`Profile balance: ${profileBalanceInTRAC} TRAC`); } + async _clearReplicationDatabaseData(offerId) { + await Models.bids.destroy({ + where: { + offer_id: offerId, + status: 'COMPLETED', + }, + }); + + await Models.holding_data.destroy({ + where: { + offer_id: offerId, + }, + }); + } /** * Builds default command * @param map diff --git a/modules/constants.js b/modules/constants.js index 36b0be6495..11738d13ed 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -47,13 +47,19 @@ exports.TRAIL_COMMAND_CLEANUP_TIME_MILLS = 60 * 60 * 1000; */ exports.HANDLER_IDS_COMMAND_CLEANUP_TIME_MILLS = 60 * 60 * 1000; +/** + * @constant {number} DATASET_PRUNING_COMMAND_TIME_MILLS - + * Datasets pruning command interval 24h + */ +exports.DATASET_PRUNING_COMMAND_TIME_MILLS = 24 * 60 * 60 * 1000; + /** * @constant {Array} PERMANENT_COMMANDS - List of all permanent commands */ exports.PERMANENT_COMMANDS = [ 'cleanerCommand', 'dcChallengesCommand', 'dhLitigationInitiatedCommand', 'reputationUpdateCommand', 'autoupdaterCommand', 'exportCleanerCommand', - 'trailCleanerCommand', 'handlerIdsCleanerCommand', + 'trailCleanerCommand', 'handlerIdsCleanerCommand', 'datasetPruningCommand', ]; /** diff --git a/modules/service/dataset-pruning-service.js b/modules/service/dataset-pruning-service.js new file mode 100644 index 0000000000..e2dd3c0268 --- /dev/null +++ b/modules/service/dataset-pruning-service.js @@ -0,0 +1,214 @@ +const Models = require('../../models/index'); +const { QueryTypes } = require('sequelize'); + +class DatasetPruningService { + /** + * Default constructor + * @param ctx + */ + constructor(ctx) { + this.logger = ctx.logger; + this.graphStorage = ctx.graphStorage; + } + + getIdsForPruning( + repackedDatasets, + importedPruningDelayInMinutes, + replicatedPruningDelayInMinutes, + ) { + const importedPruningDelayInMilisec = importedPruningDelayInMinutes * 60 * 1000; + const replicatedPruningDelayInMilisec = replicatedPruningDelayInMinutes * 60 * 1000; + const datasetsToBeDeleted = []; + let dataInfoIdToBeDeleted = []; + let offerIdToBeDeleted = []; + + const now = (new Date()).getTime(); + Object.keys(repackedDatasets).forEach((key) => { + const dataset = repackedDatasets[key]; + // there are no bids or offers associated with datasetId + if (dataset.offers.length === 0 && dataset.bids.length === 0) { + let dataInfoDeletedCount = 0; + // import delay + dataset.dataInfo.forEach((dataInfo) => { + if (dataInfo.importTimestamp + + importedPruningDelayInMilisec < now) { + dataInfoIdToBeDeleted.push(dataInfo.id); + dataInfoDeletedCount += 1; + } + }); + if (dataset.dataInfo.length === dataInfoDeletedCount) { + const latestImportTimestamp = + Math.max(...dataset.dataInfo.map(di => di.importTimestamp)); + datasetsToBeDeleted.push({ + datasetId: key, + importTimestamp: latestImportTimestamp, + }); + } + } else { + const latestImportTimestamp = + Math.max(...dataset.dataInfo.map(di => di.importTimestamp)); + + // get offer ids for pruning + const offerIds = this.getIdsForPruningFromArray( + dataset.offers, + latestImportTimestamp, + replicatedPruningDelayInMilisec, + ); + const offersDeletedCount = offerIds.length; + offerIdToBeDeleted = offerIdToBeDeleted.concat(offerIds); + + // get bid ids for pruning + const bidIds = this.getIdsForPruningFromArray( + dataset.bids, + latestImportTimestamp, + replicatedPruningDelayInMilisec, + ); + const bidsDeletedCount = bidIds.length; + + // get data info ids for pruning + if (offersDeletedCount === dataset.offers.length && + bidsDeletedCount === dataset.bids.length) { + datasetsToBeDeleted.push({ + datasetId: key, + importTimestamp: latestImportTimestamp, + }); + dataInfoIdToBeDeleted = dataInfoIdToBeDeleted + .concat(dataset.dataInfo.map(di => di.id)); + } + } + }); + this.logger.trace(`Found ${datasetsToBeDeleted.length} datasets for pruning`); + + return { + datasetsToBeDeleted, + offerIdToBeDeleted, + dataInfoIdToBeDeleted, + }; + } + + async fetchDatasetData() { + const queryString = 'select di.id as data_info_id, offer.id as offer_id, bid.id as bid_id, pd.id as purchased_data_id, di.data_set_id, di.import_timestamp, \n' + + 'offer.holding_time_in_minutes as offer_holding_time_in_minutes,\n' + + 'bid.holding_time_in_minutes as bid_holding_time_in_minutes \n' + + 'from data_info as di\n' + + 'left join offers as offer on di.data_set_id = offer.data_set_id\n' + + 'left join bids as bid on di.data_set_id = bid.data_set_id\n' + + 'left join purchased_data as pd on di.data_set_id = pd.data_set_id'; + return Models.sequelize.query(queryString, { type: QueryTypes.SELECT }); + } + + async updatePruningHistory(datasetsToBeDeleted) { + const now = (new Date()).getTime(); + for (const dataset of datasetsToBeDeleted) { + // eslint-disable-next-line no-await-in-loop + await Models.pruning_history.create({ + data_set_id: dataset.datasetId, + imported_timestamp: dataset.importTimestamp, + pruned_timestamp: now, + }); + } + } + + async removeEntriesWithId(table, idArray) { + await Models[table].destroy({ + where: { + id: { [Models.sequelize.Op.in]: idArray }, + }, + }); + } + + async removeDatasetsFromGraphDb(datasets) { + for (const dataset of datasets) { + try { + // eslint-disable-next-line no-await-in-loop + await this.graphStorage.removeDataset(dataset.datasetId); + } catch (error) { + this.logger.error('Unable to prune dataset with id: ', dataset.datasetId); + } + } + } + + getIdsForPruningFromArray(array, latestImportTimestamp, replicatedPruningDelayInMilisec) { + const now = (new Date()).getTime(); + const idsForPruning = []; + if (array.length > 0) { + array.forEach((element) => { + if (latestImportTimestamp + + replicatedPruningDelayInMilisec + + (element.holdingTimeInMinutes * 60 * 1000) < now) { + idsForPruning.push(element.id); + } + }); + } + return idsForPruning; + } + + /** + * Returns repacked dataset information in format: + * { + * [data_set_id]: { + * dataInfo: [{ + * id: ... + * importTimestamp: ... + * }], + * offers: [{ + * id: ... + * holdingTimeInMinutes: ... + * }], + * bids: [{ + * id: ... + * holdingTimeInMinutes: ... + * }] + * } + * } + * @param datasets + * @returns {{}} + */ + repackDatasets(datasets, includePurchased = false) { + const repackedDatasets = {}; + datasets.forEach((dataset) => { + if (!includePurchased && dataset.purchased_data_id) { + return; + } + if (!repackedDatasets[dataset.data_set_id]) { + repackedDatasets[dataset.data_set_id] = { + dataInfo: [], + offers: [], + bids: [], + }; + } + const foundDataInfoId = repackedDatasets[dataset.data_set_id].dataInfo + .some(el => el.id === dataset.data_info_id); + if (!foundDataInfoId) { + repackedDatasets[dataset.data_set_id].dataInfo.push({ + id: dataset.data_info_id, + importTimestamp: new Date(dataset.import_timestamp).getTime(), + }); + } + + if (dataset.offer_id) { + const foundOfferId = repackedDatasets[dataset.data_set_id].offers + .some(el => el.id === dataset.offer_id); + if (!foundOfferId) { + repackedDatasets[dataset.data_set_id].offers.push({ + id: dataset.offer_id, + holdingTimeInMinutes: dataset.offer_holding_time_in_minutes, + }); + } + } + if (dataset.bid_id) { + const foundBidId = repackedDatasets[dataset.data_set_id].bids + .some(el => el.id === dataset.bid_id); + if (!foundBidId) { + repackedDatasets[dataset.data_set_id].bids.push({ + id: dataset.bid_id, + holdingTimeInMinutes: dataset.bid_holding_time_in_minutes, + }); + } + } + }); + return repackedDatasets; + } +} + +module.exports = DatasetPruningService; diff --git a/modules/service/dh-service.js b/modules/service/dh-service.js index 5aff3b29da..2f1ddd6ccc 100644 --- a/modules/service/dh-service.js +++ b/modules/service/dh-service.js @@ -458,12 +458,10 @@ class DHService { // eslint-disable-next-line const importId = import_id; - const verticesPromise = this.graphStorage.findVerticesByImportId(importId); - const edgesPromise = this.graphStorage.findEdgesByImportId(importId); + const verticesAndEdges = await this.graphStorage + .getDatasetWithVerticesAndEdges(importId); - const values = await Promise.all([verticesPromise, edgesPromise]); - const vertices = values[0]; - const edges = values[1]; + const { vertices, edges } = verticesAndEdges; ImportUtilities.unpackKeys(vertices, edges); diff --git a/modules/worker/dataset-pruning-worker.js b/modules/worker/dataset-pruning-worker.js new file mode 100644 index 0000000000..80c932fab3 --- /dev/null +++ b/modules/worker/dataset-pruning-worker.js @@ -0,0 +1,29 @@ +const logger = require('../../modules/logger'); +const GraphStorage = require('../../modules/Database/GraphStorage'); +const DatasetPruningService = require('../../modules/service/dataset-pruning-service'); + +process.on('message', async (data) => { + const { + selectedDatabase, + importedPruningDelayInMinutes, + replicatedPruningDelayInMinutes, + repackedDatasets, + } = JSON.parse(data); + try { + const graphStorage = new GraphStorage(selectedDatabase, logger); + const datasetPruningService = new DatasetPruningService({ logger, graphStorage }); + const idsForPruning = datasetPruningService + .getIdsForPruning( + repackedDatasets, + importedPruningDelayInMinutes, + replicatedPruningDelayInMinutes, + ); + if (idsForPruning.datasetsToBeDeleted.length !== 0) { + await datasetPruningService + .removeDatasetsFromGraphDb(idsForPruning.datasetsToBeDeleted); + } + process.send(idsForPruning); + } catch (error) { + process.send({ error: `${error.message}` }); + } +}); diff --git a/package-lock.json b/package-lock.json index bcf82b5feb..969e0335c3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "5.0.0", + "version": "5.0.2", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 21f810edc2..b57cd77685 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "5.0.1", + "version": "5.0.2", "description": "OriginTrail node", "main": ".eslintrc.js", "config": { diff --git a/test/modules/service/dataset-pruning-service-test.js b/test/modules/service/dataset-pruning-service-test.js new file mode 100644 index 0000000000..bdb8c327b8 --- /dev/null +++ b/test/modules/service/dataset-pruning-service-test.js @@ -0,0 +1,131 @@ +const { + describe, before, beforeEach, it, +} = require('mocha'); +const { assert } = require('chai'); +const logger = require('../../../modules/logger'); +const DatasetPruningService = require('../../../modules/service/dataset-pruning-service'); + +const datasetPruningService = new DatasetPruningService({ logger }); +const importedPruningDelayInMinutes = 5; +const replicatedPruningDelayInMinutes = 5; + +const repackedDatasets = { + expired_dataset_id_1: { + dataInfo: [{ id: 'expired_data_info_id_1' }], + offers: [], + bids: [], + }, + expired_dataset_id_2: { + dataInfo: [{ id: 'expired_data_info_id_2' }], + offers: [{ id: 'expired_offer_id_1' }], + bids: [], + }, + expired_dataset_id_3: { + dataInfo: [{ id: 'expired_data_info_id_3' }], + offers: [], + bids: [{ id: 'expired_bid_id_1' }], + }, + expired_dataset_id_4: { + dataInfo: [{ id: 'expired_data_info_id_4' }], + offers: [{ id: 'expired_offer_id_2' }], + bids: [{ id: 'expired_bid_id_2' }], + }, + valid_dataset_id_1: { + dataInfo: [{ id: 'expired_data_info_id_6' }], + offers: [{ id: 'valid_offer_id_1' }], + bids: [], + }, + valid_dataset_id_2: { + dataInfo: [{ id: 'expired_data_info_id_6' }], + offers: [{ id: 'valid_offer_id_2' }, { id: 'expired_offer_id_3' }], + bids: [], + }, + valid_dataset_id_3: { + dataInfo: [{ id: 'expired_data_info_id_7' }], + offers: [], + bids: [{ id: 'valid_bid_id_1' }], + }, + valid_dataset_id_4: { + dataInfo: [{ id: 'expired_data_info_id_8' }], + offers: [], + bids: [{ id: 'valid_bid_id_2' }, { id: 'expired_bid_id_4' }], + }, + valid_dataset_id_5: { + dataInfo: [{ id: 'expired_data_info_id_9' }], + offers: [{ id: 'valid_offer_id_3' }], + bids: [{ id: 'valid_bid_id_3' }], + }, + valid_dataset_id_6: { + dataInfo: [{ id: 'expired_data_info_id_10' }], + offers: [{ id: 'expired_offer_id_4' }], + bids: [{ id: 'valid_bid_id_4' }], + }, + valid_dataset_id_7: { + dataInfo: [{ id: 'expired_data_info_id_11' }], + offers: [{ id: 'valid_offer_id_4' }], + bids: [{ id: 'expired_bid_id_5' }], + }, + valid_dataset_id_8: { + dataInfo: [{ id: 'valid_data_info_id_1' }], + offers: [{ id: 'expired_offer_id_5' }], + bids: [{ id: 'expired_bid_id_6' }], + }, + valid_dataset_id_9: { + dataInfo: [{ id: 'valid_data_info_id_2' }, { id: 'expired_data_info_id_12' }], + offers: [{ id: 'expired_offer_id_6' }], + bids: [{ id: 'expired_bid_id_7' }], + }, +}; + +const datasetsIdForPruning = [ + 'expired_dataset_id_1', + 'expired_dataset_id_2', + 'expired_dataset_id_3', + 'expired_dataset_id_4', +]; +const offerIdForPruning = [ + 'expired_offer_id_1', + 'expired_offer_id_2', + 'expired_offer_id_3', + 'expired_offer_id_4', +]; +const dataInfoIdForPruning = [ + 'expired_data_info_id_1', + 'expired_data_info_id_2', + 'expired_data_info_id_3', + 'expired_data_info_id_4', +]; + +describe('Dataset pruning service test', () => { + beforeEach('Setup container', async () => { + const now = Date.now(); + const expiredTimestamp = now - (2 * importedPruningDelayInMinutes * 60 * 1000); + const expiredHoldingTimeInMinutes = 1; + const validHoldingTimeInMinutes = 10; + + Object.keys(repackedDatasets).forEach((key) => { + const dataset = repackedDatasets[key]; + dataset.dataInfo.forEach((dataInfo) => { + dataInfo.importTimestamp = dataInfo.id.startsWith('expired') ? expiredTimestamp : now; + }); + dataset.offers.forEach((offer) => { + offer.holdingTimeInMinutes = offer.id.startsWith('expired') ? expiredHoldingTimeInMinutes : validHoldingTimeInMinutes; + }); + dataset.bids.forEach((bid) => { + bid.holdingTimeInMinutes = bid.id.startsWith('expired') ? expiredHoldingTimeInMinutes : validHoldingTimeInMinutes; + }); + }); + }); + + it('Get ids for pruning method test', async () => { + const idsForPruning = datasetPruningService.getIdsForPruning( + repackedDatasets, + importedPruningDelayInMinutes, + replicatedPruningDelayInMinutes, + ); + + assert.deepEqual(idsForPruning.dataInfoIdToBeDeleted, dataInfoIdForPruning, 'Wrong datainfo ids for pruning'); + assert.deepEqual(idsForPruning.offerIdToBeDeleted, offerIdForPruning, 'Wrong offer ids for pruning'); + assert.deepEqual(idsForPruning.datasetsToBeDeleted.map(e => e.datasetId), datasetsIdForPruning, 'Wrong dataset ids for pruning'); + }); +}); diff --git a/test/modules/utilities.test.js b/test/modules/utilities.test.js index 95df08d632..df251b00b2 100644 --- a/test/modules/utilities.test.js +++ b/test/modules/utilities.test.js @@ -22,7 +22,7 @@ describe('Utilities module', () => { 'control_port_enabled', 'control_port', 'control_sock_enabled', 'control_sock', 'onion_enabled', 'ssl_authority_paths', 'node_rpc_port', 'default_data_price', 'operational_db', 'remote_control_enabled', 'dc_challenge_retry_delay_in_millis', 'dh_challenge_retry_delay_in_millis', - 'read_stake_factor', 'send_logs_to_origintrail', + 'read_stake_factor', 'send_logs_to_origintrail', 'dataset_pruning', 'dh_min_reputation', 'dh_min_stake_amount', 'houston_password_file_name', 'is_bootstrap_node', 'houston_password', 'reverse_tunnel_address', 'reverse_tunnel_port', 'autoUpdater', 'bugSnag', 'network', 'dataSetStorage', 'dc_holding_time_in_minutes', 'dc_choose_time', 'dc_litigation_interval_in_minutes', From c7baeb8e86dc831000db2a1f196bff4f1ecc7343 Mon Sep 17 00:00:00 2001 From: Uros Kukic <33048701+Kuki145@users.noreply.github.com> Date: Wed, 21 Apr 2021 09:17:43 +0200 Subject: [PATCH 13/21] Prevent the possibility of calculated offer price being a decimal value (#1513) --- modules/service/pricing-service.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/service/pricing-service.js b/modules/service/pricing-service.js index 844bb7af04..16099d838a 100644 --- a/modules/service/pricing-service.js +++ b/modules/service/pricing-service.js @@ -35,7 +35,7 @@ class PricingService { const price = (2 * basePayoutCostInTrac) + (priceFactor * Math.sqrt(2 * holdingTimeInDays * dataSizeInMB)); - const finalPrice = price * 1000000000000000000; + const finalPrice = Math.ceil(price * 1000000000000000000); this.logger.trace(`Calculated offer price for data size: ${dataSizeInMB}MB, and holding time: ${holdingTimeInDays} days, PRICE: ${finalPrice}[mTRAC]`); return { finalPrice, tracInBaseCurrency, gasPriceInGwei }; } From 88f739f1c36544d093674ada4e34d13e7a7a0490 Mon Sep 17 00:00:00 2001 From: Uros Kukic <33048701+Kuki145@users.noreply.github.com> Date: Wed, 21 Apr 2021 11:14:07 +0200 Subject: [PATCH 14/21] Handle offer payout if blockchain_id is not specified (#1518) --- modules/command/dh/dh-pay-out-command.js | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/modules/command/dh/dh-pay-out-command.js b/modules/command/dh/dh-pay-out-command.js index f42d6d3d79..6ade2ea220 100644 --- a/modules/command/dh/dh-pay-out-command.js +++ b/modules/command/dh/dh-pay-out-command.js @@ -28,6 +28,9 @@ class DhPayOutCommand extends Command { const { offerId, urgent, + } = command.data; + + let { blockchain_id, } = command.data; @@ -38,6 +41,16 @@ class DhPayOutCommand extends Command { }, }); + if (!blockchain_id) { + if (bid && bid.blockchain_id) { + // eslint-disable-next-line prefer-destructuring + blockchain_id = bid.blockchain_id; + } else { + this.logger.important(`Cannot determine blockchain_id for offer ${offerId}. Cannot execute payout.`); + return Command.empty(); + } + } + const blockchainIdentity = this.profileService.getIdentity(blockchain_id); if (!bid) { From 90ffb27d69572bff6e12c87b1623205b9a5c36a6 Mon Sep 17 00:00:00 2001 From: kotlarmilos Date: Wed, 21 Apr 2021 12:54:32 +0200 Subject: [PATCH 15/21] Add migration files to backup and restore scripts (#1507) * Add migrations to backup/restore script --- scripts/backup.js | 19 +++++++++++++++++++ scripts/restore.js | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/scripts/backup.js b/scripts/backup.js index bd2b242b5f..c352e4d1e9 100644 --- a/scripts/backup.js +++ b/scripts/backup.js @@ -95,6 +95,19 @@ function getDataFileNames() { return ['kademlia.crt', 'kademlia.key', 'houston.txt', 'system.db']; } +function getMigrationFileNames() { + const migrationFileNames = []; + const directoryPath = path.join(config.appDataPath, 'migrations'); + if (fs.existsSync(directoryPath)) { + fs.readdir(directoryPath, (err, files) => { + files.forEach((file) => { + migrationFileNames.push(file); + }); + }); + } + return migrationFileNames; +} + function moveFileFromNodeToBackup(fileName, nodeDir, backupDir, showErrors = true) { try { const source = path.join(nodeDir, fileName); @@ -129,6 +142,7 @@ function createBackupFolder() { console.log(`Creating ${argv.backup_directory}/${timestamp} directories...`); mkdirp.sync(`${argv.backup_directory}/${timestamp}`); + mkdirp.sync(`${argv.backup_directory}/${timestamp}/migrations`); return path.join(argv.backup_directory, timestamp); } @@ -140,6 +154,7 @@ function main() { const identityFiles = getIdentityFileNames(); const certs = getCertificateFileNames(); const dataFiles = getDataFileNames(); + const migrationFiles = getMigrationFileNames(); backupDir = createBackupFolder(); @@ -166,6 +181,10 @@ function main() { moveFileFromNodeToBackup(file, argv.certs, backupDir, false); } + for (const file of migrationFiles) { + moveFileFromNodeToBackup(file, path.join(config.appDataPath, 'migrations'), path.join(backupDir, 'migrations')); + } + console.log('Database export...'); if (config.database.password_file_name) { diff --git a/scripts/restore.js b/scripts/restore.js index 642cb21832..15c1bba958 100644 --- a/scripts/restore.js +++ b/scripts/restore.js @@ -3,6 +3,7 @@ const rc = require('rc'); const argv = require('minimist')(process.argv.slice(2)); const { exec, execSync } = require('child_process'); const path = require('path'); +const mkdirp = require('mkdirp'); require('dotenv').config(); const Blockchain = require('../modules/Blockchain'); @@ -71,6 +72,9 @@ class RestoreService { const identityFiles = this._getIdentityFileNames(); const certs = this._getCertificateFileNames(); const dataFiles = this._getDataFileNames(); + const migrationFiles = this._getMigrationFileNames(); + + this._checkFolderStruct(); const res = this._moveFileFromBackupToNode( this.config_name, @@ -100,6 +104,10 @@ class RestoreService { this._moveFileFromBackupToNode(file, this.restore_directory, this.certs, false); } + for (const file of migrationFiles) { + this._moveFileFromBackupToNode(file, path.join(this.restore_directory, 'migrations'), path.join(this.config.appDataPath, 'migrations')); + } + this._moveDatabaseFromBackupToNode(); this.logger.log('***********************************************'); @@ -185,6 +193,30 @@ class RestoreService { return dataFileNames; } + + _getMigrationFileNames() { + const migrationFileNames = []; + const directoryPath = path.join(this.restore_directory, 'migrations'); + if (fs.existsSync(directoryPath)) { + fs.readdir(directoryPath, (err, files) => { + files.forEach((file) => { + migrationFileNames.push(file); + }); + }); + } + return migrationFileNames; + } + + _checkFolderStruct() { + if (fs.existsSync(path.join(this.config.appDataPath, 'migrations'))) { + console.log(`Directory ${path.join(this.config.appDataPath, 'migrations')} already exists.`); + return; + } + + console.log(`Creating ${this.config.appDataPath}/migrations directories...`); + mkdirp.sync(`${this.config.appDataPath}/migrations`); + } + _moveFileFromBackupToNode(fileName, backupDir, restoreDir, showErrors = true) { try { const source = path.join(backupDir, fileName); From 5612ea12c80b30e0a73b3a8838ec50435f3682de Mon Sep 17 00:00:00 2001 From: Uros Kukic <33048701+Kuki145@users.noreply.github.com> Date: Wed, 21 Apr 2021 12:58:29 +0200 Subject: [PATCH 16/21] Add a delay to replication data send command (#1520) --- ...update-offers-add-replication-timestamp.js | 14 +++++ models/offers.js | 1 + .../command/dc/dc-offer-finalized-command.js | 1 + modules/command/dc/dc-offer-task-command.js | 7 ++- .../command/dc/dc-replication-send-command.js | 61 ++++++++++++------- modules/constants.js | 4 ++ modules/service/dc-service.js | 23 ++++--- 7 files changed, 77 insertions(+), 34 deletions(-) create mode 100644 migrations/202104201448123-update-offers-add-replication-timestamp.js diff --git a/migrations/202104201448123-update-offers-add-replication-timestamp.js b/migrations/202104201448123-update-offers-add-replication-timestamp.js new file mode 100644 index 0000000000..e8bfdf9de9 --- /dev/null +++ b/migrations/202104201448123-update-offers-add-replication-timestamp.js @@ -0,0 +1,14 @@ +module.exports = { + up: async (queryInterface, Sequelize) => { + await queryInterface.addColumn( + 'offers', + 'replication_start_timestamp', + { + type: Sequelize.STRING, + }, + ); + }, + down: async (queryInterface) => { + await queryInterface.removeColumn('offers', 'replication_start_timestamp'); + }, +}; diff --git a/models/offers.js b/models/offers.js index 4183d2c431..dcbd554a06 100644 --- a/models/offers.js +++ b/models/offers.js @@ -28,6 +28,7 @@ module.exports = (sequelize, DataTypes) => { price_factor_used_for_price_calculation: DataTypes.INTEGER, offer_finalize_transaction_hash: DataTypes.STRING(128), blockchain_id: DataTypes.STRING, + replication_start_timestamp: DataTypes.STRING, }, {}); offers.associate = (models) => { // associations can be defined here diff --git a/modules/command/dc/dc-offer-finalized-command.js b/modules/command/dc/dc-offer-finalized-command.js index 34e68aa328..22fc501aa3 100644 --- a/modules/command/dc/dc-offer-finalized-command.js +++ b/modules/command/dc/dc-offer-finalized-command.js @@ -18,6 +18,7 @@ class DcOfferFinalizedCommand extends Command { this.logger = ctx.logger; this.config = ctx.config; this.graphStorage = ctx.graphStorage; + this.dcService = ctx.dcService; this.challengeService = ctx.challengeService; this.replicationService = ctx.replicationService; this.remoteControl = ctx.remoteControl; diff --git a/modules/command/dc/dc-offer-task-command.js b/modules/command/dc/dc-offer-task-command.js index 8a30045fc5..3c2c418356 100644 --- a/modules/command/dc/dc-offer-task-command.js +++ b/modules/command/dc/dc-offer-task-command.js @@ -12,6 +12,7 @@ class DcOfferTaskCommand extends Command { constructor(ctx) { super(ctx); this.logger = ctx.logger; + this.dcService = ctx.dcService; this.replicationService = ctx.replicationService; this.remoteControl = ctx.remoteControl; this.errorNotificationService = ctx.errorNotificationService; @@ -58,9 +59,13 @@ class DcOfferTaskCommand extends Command { } offer.task = eventTask; offer.offer_id = eventOfferId; + offer.replication_start_timestamp = Date.now().toString(); offer.status = 'STARTED'; offer.message = 'Offer has been successfully started. Waiting for DHs...'; - await offer.save({ fields: ['task', 'offer_id', 'status', 'message'] }); + await offer.save({ + fields: ['task', 'offer_id', 'replication_start_timestamp', 'status', 'message'], + }); + this.remoteControl.offerUpdate({ id: internalOfferId, }); diff --git a/modules/command/dc/dc-replication-send-command.js b/modules/command/dc/dc-replication-send-command.js index 11d06a6554..d8ec8c1a2f 100644 --- a/modules/command/dc/dc-replication-send-command.js +++ b/modules/command/dc/dc-replication-send-command.js @@ -14,6 +14,7 @@ class DCReplicationSendCommand extends Command { this.logger = ctx.logger; this.transport = ctx.transport; this.blockchain = ctx.blockchain; + this.dcService = ctx.dcService; this.replicationService = ctx.replicationService; this.permissionedDataService = ctx.permissionedDataService; @@ -28,9 +29,25 @@ class DCReplicationSendCommand extends Command { */ async execute(command) { const { - internalOfferId, wallet, identity, dhIdentity, offerId, + internalOfferId, wallet, identity, dhIdentity, offerId, blockchainId, + replicationStartTime, } = command.data; + // Check if the replication window expired + if ((replicationStartTime + this.config.dc_choose_time) < Date.now()) { + this.logger.debug(`Too late to send the replication for offer ${offerId} to ${identity}.`); + return Command.empty(); + } + + const purposes = await this.blockchain + .getWalletPurposes(dhIdentity, wallet, blockchainId).response; + if (!purposes.includes('2')) { + const message = 'Wallet provided does not have the appropriate permissions set up for the given identity.'; + this.logger.warn(message); + // TODO Send some response to DH to avoid pointlessly waiting + return Command.empty(); + } + const usedDH = await Models.replicated_data.findOne({ where: { @@ -40,35 +57,33 @@ class DCReplicationSendCommand extends Command { offer_id: offerId, }, }); - - let colorNumber = Utilities.getRandomInt(2); - if (usedDH != null && usedDH.status === 'STARTED' && usedDH.color) { - colorNumber = usedDH.color; + if (usedDH) { + return Command.empty(); } + const colorNumber = Utilities.getRandomInt(2); + const color = this.replicationService.castNumberToColor(colorNumber); const offer = await Models.offers.findOne({ where: { id: internalOfferId } }); const replication = await this.replicationService.loadReplication(offer.id, color); - if (!usedDH) { - await Models.replicated_data.create({ - dh_id: identity, - dh_wallet: wallet.toLowerCase(), - dh_identity: dhIdentity.toLowerCase(), - offer_id: offer.offer_id, - litigation_private_key: replication.litigationPrivateKey, - litigation_public_key: replication.litigationPublicKey, - distribution_public_key: replication.distributionPublicKey, - distribution_private_key: replication.distributionPrivateKey, - distribution_epk_checksum: replication.distributionEpkChecksum, - litigation_root_hash: replication.litigationRootHash, - distribution_root_hash: replication.distributionRootHash, - distribution_epk: replication.distributionEpk, - status: 'STARTED', - color: colorNumber, - }); - } + await Models.replicated_data.create({ + dh_id: identity, + dh_wallet: wallet.toLowerCase(), + dh_identity: dhIdentity.toLowerCase(), + offer_id: offer.offer_id, + litigation_private_key: replication.litigationPrivateKey, + litigation_public_key: replication.litigationPublicKey, + distribution_public_key: replication.distributionPublicKey, + distribution_private_key: replication.distributionPrivateKey, + distribution_epk_checksum: replication.distributionEpkChecksum, + litigation_root_hash: replication.litigationRootHash, + distribution_root_hash: replication.distributionRootHash, + distribution_epk: replication.distributionEpk, + status: 'STARTED', + color: colorNumber, + }); const toSign = [ Utilities.denormalizeHex(new BN(replication.distributionEpkChecksum).toString('hex')), diff --git a/modules/constants.js b/modules/constants.js index 11738d13ed..50adcb126e 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -2,6 +2,10 @@ * @constant {number} DEFAULT_NUMBER_OF_HOLDERS - Number of data holders for a dataset */ exports.DEFAULT_NUMBER_OF_HOLDERS = 3; +/** + * @constant {number} DEFAULT_NUMBER_OF_HOLDERS - Number of data holders for a dataset + */ +exports.REPLICATION_MIN_DELAY_MILLS = 4 * 60 * 1000; /** * @constant {number} DEFAULT_CHALLENGE_NUMBER_OF_TESTS - Number of challenges per DH diff --git a/modules/service/dc-service.js b/modules/service/dc-service.js index 0266ce0055..2f07102aab 100644 --- a/modules/service/dc-service.js +++ b/modules/service/dc-service.js @@ -7,6 +7,7 @@ const models = require('../../models'); const constants = require('../constants'); const ImportUtilities = require('../ImportUtilities'); + class DCService { constructor(ctx) { this.transport = ctx.transport; @@ -266,15 +267,6 @@ class DCService { return; } - const purposes = await this.blockchain - .getWalletPurposes(dhIdentity, wallet, offer.blockchain_id).response; - if (!purposes.includes('2')) { - const message = 'Wallet provided does not have the appropriate permissions set up for the given identity.'; - this.logger.warn(message); - await this.transport.sendResponse(response, { status: 'fail', message }); - return; - } - const dhReputation = await this.getReputationForDh(dhIdentity); if (dhReputation.lt(new BN(this.config.dh_min_reputation))) { @@ -287,9 +279,17 @@ class DCService { if (async_enabled) { await this._sendReplicationAcknowledgement(offerId, identity, response); + const minDelay = + Math.min(constants.REPLICATION_MIN_DELAY_MILLS, this.config.dc_choose_time * 0.1); + const maxDelay = this.config.dc_choose_time * 0.9; + const randomDelay = Math.ceil(minDelay + (Math.random() * (maxDelay - minDelay))); + + const startTime = parseInt(offer.replication_start_timestamp, 10); + const adjustedDelay = (startTime - Date.now()) + randomDelay; + await this.commandExecutor.add({ name: 'dcReplicationSendCommand', - delay: 0, + delay: (adjustedDelay > 0 ? adjustedDelay : 0), data: { internalOfferId: offer.id, offerId, @@ -297,6 +297,8 @@ class DCService { identity, dhIdentity, response, + blockchainId: offer.blockchain_id, + replicationStartTime: startTime, }, transactional: false, }); @@ -405,6 +407,7 @@ class DCService { identity, dhIdentity, response, + replicationStartTime: parseInt(offer.replication_start_timestamp, 10), }, transactional: false, }); From c564832154b40a8fd7049625269142e878763104 Mon Sep 17 00:00:00 2001 From: Uros Kukic <33048701+Kuki145@users.noreply.github.com> Date: Wed, 21 Apr 2021 16:06:40 +0200 Subject: [PATCH 17/21] Fix missed temporary changes on ot-node (#1522) --- config/config.json | 4 ++-- modules/command/dc/dc-replication-send-command.js | 1 + modules/constants.js | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/config/config.json b/config/config.json index 25d0160703..2bb85f4bb4 100644 --- a/config/config.json +++ b/config/config.json @@ -378,8 +378,8 @@ }, "autoUpdater": { "enabled": true, - "packageJsonUrl": "https://raw.githubusercontent.com/OriginTrail/ot-node/feature/dataset-pruning/package.json", - "archiveUrl": "https://github.com/OriginTrail/ot-node/archive/feature/dataset-pruning.zip" + "packageJsonUrl": "https://raw.githubusercontent.com/OriginTrail/ot-node/release/testnet/package.json", + "archiveUrl": "https://github.com/OriginTrail/ot-node/archive/release/testnet.zip" }, "dataSetStorage": "data_set_storage", "dc_holding_time_in_minutes": 60, diff --git a/modules/command/dc/dc-replication-send-command.js b/modules/command/dc/dc-replication-send-command.js index d8ec8c1a2f..730e814188 100644 --- a/modules/command/dc/dc-replication-send-command.js +++ b/modules/command/dc/dc-replication-send-command.js @@ -58,6 +58,7 @@ class DCReplicationSendCommand extends Command { }, }); if (usedDH) { + this.logger.warn(`Already sent replication data for offer ${offerId} to ${identity}`); return Command.empty(); } diff --git a/modules/constants.js b/modules/constants.js index 50adcb126e..327ef4c39c 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -3,7 +3,8 @@ */ exports.DEFAULT_NUMBER_OF_HOLDERS = 3; /** - * @constant {number} DEFAULT_NUMBER_OF_HOLDERS - Number of data holders for a dataset + * @constant {number} REPLICATION_MIN_DELAY_MILLS - Default minimum delay for replication sending + * The value could be lower if the 10% of the DC choose time is shorter */ exports.REPLICATION_MIN_DELAY_MILLS = 4 * 60 * 1000; From 7ba248802e884e62b8713c652e68304b97343a4d Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Thu, 22 Apr 2021 14:34:27 +0200 Subject: [PATCH 18/21] fixed issue with pruning command not able to delete datasets --- models/pruning_history.js | 5 ++++- modules/command/common/dataset-pruning-command.js | 3 ++- modules/worker/dataset-pruning-worker.js | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/models/pruning_history.js b/models/pruning_history.js index bedc105981..302675c1aa 100644 --- a/models/pruning_history.js +++ b/models/pruning_history.js @@ -10,7 +10,10 @@ module.exports = (sequelize, DataTypes) => { data_set_id: DataTypes.STRING, imported_timestamp: DataTypes.STRING, pruned_timestamp: DataTypes.STRING, - }, {}); + }, { + freezeTableName: true, + tableName: 'pruning_history', + }); pruning_history.associate = (models) => { // associations can be defined here }; diff --git a/modules/command/common/dataset-pruning-command.js b/modules/command/common/dataset-pruning-command.js index a1ccf66682..1ed4cc9bf9 100644 --- a/modules/command/common/dataset-pruning-command.js +++ b/modules/command/common/dataset-pruning-command.js @@ -54,7 +54,6 @@ class DatasetPruningCommand extends Command { datasetsToBeDeleted, } = response; if (datasetsToBeDeleted.length === 0) { - this.logger.trace('Found 0 datasets for pruning'); return; } await this.datasetPruningService.removeEntriesWithId('offers', offerIdToBeDeleted); @@ -75,6 +74,8 @@ class DatasetPruningCommand extends Command { delay: constants.DATASET_PRUNING_COMMAND_TIME_MILLS, period: constants.DATASET_PRUNING_COMMAND_TIME_MILLS, transactional: false, + data: { + }, }); } diff --git a/modules/worker/dataset-pruning-worker.js b/modules/worker/dataset-pruning-worker.js index 80c932fab3..0e74f9f7d3 100644 --- a/modules/worker/dataset-pruning-worker.js +++ b/modules/worker/dataset-pruning-worker.js @@ -11,6 +11,7 @@ process.on('message', async (data) => { } = JSON.parse(data); try { const graphStorage = new GraphStorage(selectedDatabase, logger); + await graphStorage.connect(); const datasetPruningService = new DatasetPruningService({ logger, graphStorage }); const idsForPruning = datasetPruningService .getIdsForPruning( From 1b494476f4fbf38c9ee08dfc65f3144b8714a73e Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Thu, 22 Apr 2021 14:36:47 +0200 Subject: [PATCH 19/21] Updated log when arangodb returns error while pruning --- modules/service/dataset-pruning-service.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/service/dataset-pruning-service.js b/modules/service/dataset-pruning-service.js index e2dd3c0268..8822de4b04 100644 --- a/modules/service/dataset-pruning-service.js +++ b/modules/service/dataset-pruning-service.js @@ -123,7 +123,7 @@ class DatasetPruningService { // eslint-disable-next-line no-await-in-loop await this.graphStorage.removeDataset(dataset.datasetId); } catch (error) { - this.logger.error('Unable to prune dataset with id: ', dataset.datasetId); + this.logger.debug(`Unable to prune dataset with id: ${dataset.datasetId}. Error message: ${error.message}`); } } } From 313f16c850d748327a235e09b23b795fba4a4861 Mon Sep 17 00:00:00 2001 From: Djordje Kovacevic Date: Thu, 22 Apr 2021 15:26:59 +0200 Subject: [PATCH 20/21] Renamed pruning migration --- ...uning-history.js => 202104201448124-create-pruning-history.js} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename migrations/{202104151212123-create-pruning-history.js => 202104201448124-create-pruning-history.js} (100%) diff --git a/migrations/202104151212123-create-pruning-history.js b/migrations/202104201448124-create-pruning-history.js similarity index 100% rename from migrations/202104151212123-create-pruning-history.js rename to migrations/202104201448124-create-pruning-history.js From 82691afbdfa1a6bb9b2977e5669e467c68031b98 Mon Sep 17 00:00:00 2001 From: Uros Kukic Date: Fri, 23 Apr 2021 10:24:09 +0200 Subject: [PATCH 21/21] Fix how migrations directory is read during backup --- scripts/backup.js | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/scripts/backup.js b/scripts/backup.js index c352e4d1e9..398e394d9e 100644 --- a/scripts/backup.js +++ b/scripts/backup.js @@ -96,16 +96,11 @@ function getDataFileNames() { } function getMigrationFileNames() { - const migrationFileNames = []; const directoryPath = path.join(config.appDataPath, 'migrations'); if (fs.existsSync(directoryPath)) { - fs.readdir(directoryPath, (err, files) => { - files.forEach((file) => { - migrationFileNames.push(file); - }); - }); + return fs.readdirSync(directoryPath); } - return migrationFileNames; + return []; } function moveFileFromNodeToBackup(fileName, nodeDir, backupDir, showErrors = true) {