From 0551096016ce993e362c7c926938b8fb3313d33a Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Sun, 6 Mar 2022 18:30:39 +0100 Subject: [PATCH 01/14] get latest asset id from blockchain --- external/blazegraph-service.js | 23 +---------------------- external/graphdb-service.js | 26 ++------------------------ modules/controller/rpc-controller.js | 7 +++---- modules/service/data-service.js | 20 ++++++++++---------- modules/service/query-service.js | 8 +++++++- 5 files changed, 23 insertions(+), 61 deletions(-) diff --git a/external/blazegraph-service.js b/external/blazegraph-service.js index 5f3fcd214c..e86d5e7312 100644 --- a/external/blazegraph-service.js +++ b/external/blazegraph-service.js @@ -102,7 +102,6 @@ class BlazegraphService { } async resolve(uri) { - let isAsset = false; const query = `PREFIX schema: CONSTRUCT { ?s ?p ?o } WHERE { @@ -112,26 +111,6 @@ class BlazegraphService { }`; let nquads = await this.construct(query); - if (!nquads.length) { - const query = `PREFIX schema: - CONSTRUCT { ?s ?p ?o } - WHERE { - GRAPH ?g { ?s ?p ?o } - { - SELECT ?ng - WHERE { - ?ng schema:hasUALs "${uri}" ; - schema:hasTimestamp ?timestamp . - } - ORDER BY DESC(?timestamp) - LIMIT 1 - } - FILTER (?g = ?ng) . - }`; - nquads = await this.construct(query); - isAsset = true; - } - if (nquads.length) { nquads = nquads.toString(); nquads = nquads.split('\n'); @@ -140,7 +119,7 @@ class BlazegraphService { } else { nquads = null; } - return { nquads, isAsset }; + return nquads; } async transformBlankNodes(nquads) { diff --git a/external/graphdb-service.js b/external/graphdb-service.js index 75a633c605..54053fa897 100644 --- a/external/graphdb-service.js +++ b/external/graphdb-service.js @@ -117,7 +117,6 @@ class GraphdbService { } async resolve(uri) { - let isAsset = false; const query = `PREFIX schema: CONSTRUCT { ?s ?p ?o } WHERE { @@ -127,26 +126,6 @@ class GraphdbService { }`; let nquads = await this.construct(query); - if (!nquads.length) { - const query = `PREFIX schema: - CONSTRUCT { ?s ?p ?o } - WHERE { - GRAPH ?g { ?s ?p ?o } - { - SELECT ?ng - WHERE { - ?ng schema:hasUALs "${uri}" ; - schema:hasTimestamp ?timestamp . - } - ORDER BY DESC(?timestamp) - LIMIT 1 - } - FILTER (?g = ?ng) . - }`; - nquads = await this.construct(query); - isAsset = true; - } - if (nquads.length) { nquads = nquads.toString(); nquads = nquads.replace(/_:genid(.){37}/gm, '_:$1'); @@ -155,10 +134,9 @@ class GraphdbService { } else { nquads = null; } - return { nquads, isAsset }; + return nquads; } - async assertionsByAsset(uri) { const query = `PREFIX schema: SELECT ?assertionId ?issuer ?timestamp @@ -168,7 +146,7 @@ class GraphdbService { schema:hasIssuer ?issuer . } ORDER BY DESC(?timestamp)`; - let result = await this.execute(query); + const result = await this.execute(query); return JSON.parse(result).results.bindings; } diff --git a/modules/controller/rpc-controller.js b/modules/controller/rpc-controller.js index a9c13c4c30..e30005bacb 100644 --- a/modules/controller/rpc-controller.js +++ b/modules/controller/rpc-controller.js @@ -203,9 +203,8 @@ class RpcController { isAsset = true; id = assertionId; } - const result = await this.dataService.resolve(id, true); - if (result && result.nquads) { - let {nquads} = result; + const nquads = await this.dataService.resolve(id, true); + if (nquads) { let assertion = await this.dataService.createAssertion(nquads); assertion.jsonld.metadata = JSON.parse(sortedStringify(assertion.jsonld.metadata)) assertion.jsonld.data = JSON.parse(sortedStringify(await this.dataService.fromNQuads(assertion.jsonld.data, assertion.jsonld.metadata.type))) @@ -592,7 +591,7 @@ class RpcController { for (const assertionId of assertions) { const content = await this.dataService.resolve(assertionId); if (content) { - const rawNquads = content.nquads ? content.nquads : content.rdf; + const rawNquads = content.rdf ? content.rdf : content; const { nquads } = await this.dataService.createAssertion(rawNquads); const proofs = await this.validationService.getProofs(nquads, reqNquads); result.push({ assertionId, proofs }); diff --git a/modules/service/data-service.js b/modules/service/data-service.js index 5b196c37f3..c51f028346 100644 --- a/modules/service/data-service.js +++ b/modules/service/data-service.js @@ -109,14 +109,14 @@ class DataService { async resolve(id, localQuery = false, metadataOnly = false) { try { - let {nquads, isAsset} = await this.implementation.resolve(id); + let nquads = await this.implementation.resolve(id); if (!localQuery && nquads && nquads.find((x) => x.includes(`<${constants.DID_PREFIX}:${id}> "private" .`))) { return null; } if (metadataOnly) { nquads = nquads.filter((x) => x.startsWith(`<${constants.DID_PREFIX}:${id}>`)); } - return {nquads, isAsset}; + return nquads; } catch (e) { this.handleUnavailableTripleStoreError(e); } @@ -245,12 +245,12 @@ class DataService { for (let assertion of assertions) { const assertionId = assertion.assertionId = assertion.assertionId.value.replace(`${constants.DID_PREFIX}:`, ''); - assertion = await this.resolve(assertion.assertionId, localQuery, true); - if (!assertion) continue; + const nquads = await this.resolve(assertion.assertionId, localQuery, true); + if (!nquads) continue; if (localQuery) { - assertion = await this.createAssertion(assertion.nquads); + assertion = await this.createAssertion(nquads); let object = result.find((x) => x.type === assertion.jsonld.metadata.type && x.id === assertion.jsonld.metadata.UALs[0]); if (!object) { @@ -281,7 +281,7 @@ class DataService { object = { assertionId, node: this.networkService.getPeerId(), - nquads: assertion.nquads, + nquads, }; result.push(object); } @@ -303,11 +303,11 @@ class DataService { for (let assertion of assertions) { const assertionId = assertion.assertionId = assertion.assertionId.value.replace(`${constants.DID_PREFIX}:`, ''); - assertion = await this.resolve(assertion.assertionId, localQuery, true); - if (!assertion) continue; + const nquads = await this.resolve(assertion.assertionId, localQuery, true); + if (!nquads) continue; if (localQuery) { - assertion = await this.createAssertion(assertion.nquads); + assertion = await this.createAssertion(nquads); let object = result.find((x) => x.id === assertion.id); if (!object) { object = { @@ -324,7 +324,7 @@ class DataService { object = { assertionId, node: this.networkService.getPeerId(), - nquads: assertion.nquads, + nquads, }; result.push(object); } diff --git a/modules/service/query-service.js b/modules/service/query-service.js index 08f34cb82f..52ea1a0253 100644 --- a/modules/service/query-service.js +++ b/modules/service/query-service.js @@ -38,7 +38,13 @@ class QueryService { Id_operation: operationId, }); - const { nquads, isAsset } = await this.dataService.resolve(id); + let isAsset = false; + const { assertionId } = await this.blockchainService.getAssetProofs(id); + if (assertionId) { + isAsset = true; + id = assertionId; + } + const nquads = await this.dataService.resolve(id); if (nquads) { this.logger.info(`Number of n-quads retrieved from the database is ${nquads.length}`); } From 12df4da3023f4644e2715a18fa067cd0f0454ed1 Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Mon, 7 Mar 2022 01:52:25 +0100 Subject: [PATCH 02/14] remove unnecessary variables --- modules/controller/rpc-controller.js | 5 ++--- modules/service/query-service.js | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/controller/rpc-controller.js b/modules/controller/rpc-controller.js index e30005bacb..41cf1d5c0d 100644 --- a/modules/controller/rpc-controller.js +++ b/modules/controller/rpc-controller.js @@ -589,9 +589,8 @@ class RpcController { assertions = await this.dataService.findAssertions(reqNquads); } for (const assertionId of assertions) { - const content = await this.dataService.resolve(assertionId); - if (content) { - const rawNquads = content.rdf ? content.rdf : content; + const rawNquads = await this.dataService.resolve(assertionId); + if (rawNquads) { const { nquads } = await this.dataService.createAssertion(rawNquads); const proofs = await this.validationService.getProofs(nquads, reqNquads); result.push({ assertionId, proofs }); diff --git a/modules/service/query-service.js b/modules/service/query-service.js index 52ea1a0253..c87a6f11ec 100644 --- a/modules/service/query-service.js +++ b/modules/service/query-service.js @@ -110,7 +110,7 @@ class QueryService { } const rawNquads = assertion.nquads ? assertion.nquads : assertion.rdf; - const { jsonld, nquads } = await this.dataService.createAssertion(rawNquads); + const jsonld = await this.dataService.createAssertion(rawNquads); let object = handlerData.find((x) => x.type === jsonld.metadata.type && x.id === jsonld.metadata.UALs[0]) if (!object) { object = { From 0a874ae61c25ff4fedcdea6ed450330b1a933111 Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Mon, 7 Mar 2022 13:21:31 +0100 Subject: [PATCH 03/14] add blockchainService instance to query-service --- modules/service/query-service.js | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/service/query-service.js b/modules/service/query-service.js index c87a6f11ec..2810744d74 100644 --- a/modules/service/query-service.js +++ b/modules/service/query-service.js @@ -6,6 +6,7 @@ class QueryService { this.logger = ctx.logger; this.networkService = ctx.networkService; this.validationService = ctx.validationService; + this.blockchainService = ctx.blockchainService; this.dataService = ctx.dataService; this.fileService = ctx.fileService; this.workerPool = ctx.workerPool; From 1c77c53f1fbd8fa8c1c77a5d31e6d2689f5f1ae4 Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Mon, 7 Mar 2022 14:56:53 +0100 Subject: [PATCH 04/14] add 3 seconds timeout to resolve request --- modules/service/query-service.js | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/modules/service/query-service.js b/modules/service/query-service.js index 2810744d74..27ceb08d23 100644 --- a/modules/service/query-service.js +++ b/modules/service/query-service.js @@ -13,15 +13,29 @@ class QueryService { } async resolve(id, load, isAssetRequested, node) { - let result = await this.networkService.sendMessage('/resolve', id, node); - if (!result || (Array.isArray(result) && result[0] === "ack")) { + const resolvePromise = new Promise(async (resolve, reject) => { + const timer = setTimeout(() => { + resolve(null); + }, 3000); + + const result = await this.networkService.sendMessage('/resolve', id, node); + clearTimeout(timer); + resolve(result); + }); + + const result = await resolvePromise; + if (!result || (Array.isArray(result) && result[0] === 'ack')) { return null; } const { isAsset } = result; const rawNquads = result.nquads ? result.nquads : result; - let assertion = await this.dataService.createAssertion(rawNquads); - const status = await this.dataService.verifyAssertion(assertion.jsonld, assertion.nquads, {isAsset: isAssetRequested}); + const assertion = await this.dataService.createAssertion(rawNquads); + const status = await this.dataService.verifyAssertion( + assertion.jsonld, + assertion.nquads, + { isAsset: isAssetRequested }, + ); if (status && load) { await this.dataService.insert(rawNquads.join('\n'), `${constants.DID_PREFIX}:${assertion.jsonld.metadata.id}`); From b4cd5d680088afef0b7659e27322dc666d8748c9 Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Mon, 7 Mar 2022 15:00:49 +0100 Subject: [PATCH 05/14] add todo comment to resolve conditional --- modules/service/data-service.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/service/data-service.js b/modules/service/data-service.js index c51f028346..28bfa4e4a6 100644 --- a/modules/service/data-service.js +++ b/modules/service/data-service.js @@ -110,6 +110,8 @@ class DataService { async resolve(id, localQuery = false, metadataOnly = false) { try { let nquads = await this.implementation.resolve(id); + + // TODO: add function for this conditional expr for increased readability if (!localQuery && nquads && nquads.find((x) => x.includes(`<${constants.DID_PREFIX}:${id}> "private" .`))) { return null; } From ef839765a681c3a51735cc70908cc8a5ed543235 Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Mon, 7 Mar 2022 15:37:00 +0100 Subject: [PATCH 06/14] add resolve max time to constants --- modules/constants.js | 6 ++++++ modules/service/query-service.js | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/modules/constants.js b/modules/constants.js index b5dcd64194..cdb3d5844c 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -69,6 +69,12 @@ exports.TRIPLE_STORE_CONNECT_RETRY_FREQUENCY = 10; // 10 seconds */ exports.BLOCKCHAIN_QUEUE_LIMIT = 25000; +/** + * @constant {number} RESOLVE_MAX_TIME_MILLIS + * - Maximum time for resolve operation + */ +exports.RESOLVE_MAX_TIME_MILLIS = 3 * 1000; + /** * @constant {object} TRIPLE_STORE_IMPLEMENTATION - * Names of available triple store implementations diff --git a/modules/service/query-service.js b/modules/service/query-service.js index 27ceb08d23..1d728b4c32 100644 --- a/modules/service/query-service.js +++ b/modules/service/query-service.js @@ -16,7 +16,7 @@ class QueryService { const resolvePromise = new Promise(async (resolve, reject) => { const timer = setTimeout(() => { resolve(null); - }, 3000); + }, constants.RESOLVE_MAX_TIME_MILLIS); const result = await this.networkService.sendMessage('/resolve', id, node); clearTimeout(timer); From a7ca6abdc00cc76a362926717b9ce04375bb763a Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Tue, 8 Mar 2022 15:31:46 +0100 Subject: [PATCH 07/14] add rate limiting for DC and fix handleResolve --- external/libp2p-service.js | 6 ++--- .../command/publish/send-assertion-command.js | 16 +++++++++---- modules/constants.js | 23 +++++++++++++++++++ modules/controller/rpc-controller.js | 5 ++-- modules/service/publish-service.js | 17 +++++++++++++- modules/service/query-service.js | 14 +++-------- 6 files changed, 58 insertions(+), 23 deletions(-) diff --git a/external/libp2p-service.js b/external/libp2p-service.js index 8e1047fd78..1b49bfa452 100644 --- a/external/libp2p-service.js +++ b/external/libp2p-service.js @@ -180,7 +180,7 @@ class Libp2pService { ) } else { await pipe( - ['ack'], + [constants.NETWORK_RESPONSES.ACK], stream ) @@ -199,7 +199,7 @@ class Libp2pService { Event_name: constants.ERROR_TYPE.LIBP2P_HANDLE_MSG_ERROR, }); await pipe( - ['ack'], + [constants.NETWORK_RESPONSES.ACK], stream ) @@ -224,7 +224,7 @@ class Libp2pService { }, ) - if(response.toString() === 'ack') { + if(response.toString() === constants.NETWORK_RESPONSES.ACK) { return null; } diff --git a/modules/command/publish/send-assertion-command.js b/modules/command/publish/send-assertion-command.js index 89eb27f301..b60fb57afe 100644 --- a/modules/command/publish/send-assertion-command.js +++ b/modules/command/publish/send-assertion-command.js @@ -44,11 +44,17 @@ class SendAssertionCommand extends Command { } nodes = [...new Set(nodes)]; - for (const node of nodes) { - this.publishService.store({ id: assertion.id, nquads: nquads }, node).catch((e) => { - this.handleError(handlerId, e, `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String}. Error message: ${e.message}. ${e.stack}`); - }); - } + const storePromises = nodes.map((node) => this.publishService + .store({ id: assertion.id, nquads }, node) + .catch((e) => { + this.handleError( + handlerId, + e, + `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String}. Error message: ${e.message}. ${e.stack}`, + ); + })); + + await Promise.all(storePromises); await Models.handler_ids.update( { diff --git a/modules/constants.js b/modules/constants.js index cdb3d5844c..48fba099f6 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -75,6 +75,18 @@ exports.BLOCKCHAIN_QUEUE_LIMIT = 25000; */ exports.RESOLVE_MAX_TIME_MILLIS = 3 * 1000; +/** + * @constant {number} STORE_MAX_RETRIES + * - Maximum number of retries + */ +exports.STORE_MAX_RETRIES = 3; + +/** + * @constant {number} STORE_BUSY_REPEAT_INTERVAL_IN_MILLS + * - Wait interval between retries for sending store requests + */ +exports.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS = 2 * 1000; + /** * @constant {object} TRIPLE_STORE_IMPLEMENTATION - * Names of available triple store implementations @@ -84,6 +96,17 @@ exports.TRIPLE_STORE_IMPLEMENTATION = { GRAPHDB: 'GraphDB', }; +/** + * @constant {object} NETWORK_RESPONSES - + * Types of known network responses + */ +exports.NETWORK_RESPONSES = { + TRUE: true, + FALSE: false, + ACK: 'ack', + BUSY: 'busy', +}; + /** * @constant {object} ERROR_TYPE - * Types of errors supported diff --git a/modules/controller/rpc-controller.js b/modules/controller/rpc-controller.js index 41cf1d5c0d..dfc24dea7c 100644 --- a/modules/controller/rpc-controller.js +++ b/modules/controller/rpc-controller.js @@ -234,9 +234,8 @@ class RpcController { nodes = [...new Set(nodes)]; for (const node of nodes) { try { - const result = await this.queryService.resolve(id, req.query.load, isAsset, node); - if (result) { - const {assertion} = result; + const assertion = await this.queryService.resolve(id, req.query.load, isAsset, node); + if (assertion) { assertion.jsonld.metadata = JSON.parse(sortedStringify(assertion.jsonld.metadata)) assertion.jsonld.data = JSON.parse(sortedStringify(await this.dataService.fromNQuads(assertion.jsonld.data, assertion.jsonld.metadata.type))) response.push(isAsset ? { diff --git a/modules/service/publish-service.js b/modules/service/publish-service.js index 166b737623..00bead61c1 100644 --- a/modules/service/publish-service.js +++ b/modules/service/publish-service.js @@ -104,7 +104,18 @@ class PublishService { async store(assertion, node) { // await this.networkService.store(node, topic, {}); - return await this.networkService.sendMessage('/store', assertion, node); + let retries = 0; + let response = await this.networkService.sendMessage('/store', assertion, node); + while ( + response === constants.NETWORK_RESPONSES.BUSY + && retries < constants.STORE_MAX_RETRIES + ) { + retries += 1; + await this.sleepForMilliseconds(constants.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS); + response = await this.networkService.sendMessage('/store', assertion, node); + } + + return response; } async handleStore(data) { @@ -135,6 +146,10 @@ class PublishService { return status; } + + async sleepForMilliseconds(milliseconds) { + await new Promise((r) => setTimeout(r, milliseconds)); + } } module.exports = PublishService; diff --git a/modules/service/query-service.js b/modules/service/query-service.js index 1d728b4c32..fbda2a3849 100644 --- a/modules/service/query-service.js +++ b/modules/service/query-service.js @@ -6,7 +6,6 @@ class QueryService { this.logger = ctx.logger; this.networkService = ctx.networkService; this.validationService = ctx.validationService; - this.blockchainService = ctx.blockchainService; this.dataService = ctx.dataService; this.fileService = ctx.fileService; this.workerPool = ctx.workerPool; @@ -24,11 +23,10 @@ class QueryService { }); const result = await resolvePromise; - if (!result || (Array.isArray(result) && result[0] === 'ack')) { + if (!result || (Array.isArray(result) && result[0] === constants.NETWORK_RESPONSES.ACK)) { return null; } - const { isAsset } = result; const rawNquads = result.nquads ? result.nquads : result; const assertion = await this.dataService.createAssertion(rawNquads); const status = await this.dataService.verifyAssertion( @@ -41,7 +39,7 @@ class QueryService { await this.dataService.insert(rawNquads.join('\n'), `${constants.DID_PREFIX}:${assertion.jsonld.metadata.id}`); this.logger.info(`Assertion ${assertion.jsonld.metadata.id} has been successfully inserted`); } - return status ? { assertion, isAsset } : null; + return status ? assertion : null; } async handleResolve(id) { @@ -53,12 +51,6 @@ class QueryService { Id_operation: operationId, }); - let isAsset = false; - const { assertionId } = await this.blockchainService.getAssetProofs(id); - if (assertionId) { - isAsset = true; - id = assertionId; - } const nquads = await this.dataService.resolve(id); if (nquads) { this.logger.info(`Number of n-quads retrieved from the database is ${nquads.length}`); @@ -74,7 +66,7 @@ class QueryService { if (!nquads) { return null; } - return { nquads, isAsset }; + return nquads; } async search(data, node) { From 1d1388d5a2ba1cdc7e6143498df03ba1feec19f4 Mon Sep 17 00:00:00 2001 From: NZT48 Date: Tue, 8 Mar 2022 15:39:10 +0100 Subject: [PATCH 08/14] Queue for triple store requests implemented --- modules/constants.js | 6 ++ modules/service/data-service.js | 167 +++++++++++++++++++------------- ot-node.js | 4 +- 3 files changed, 110 insertions(+), 67 deletions(-) diff --git a/modules/constants.js b/modules/constants.js index cdb3d5844c..c2c850a9f8 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -63,6 +63,12 @@ exports.TRIPLE_STORE_CONNECT_MAX_RETRIES = 10; */ exports.TRIPLE_STORE_CONNECT_RETRY_FREQUENCY = 10; // 10 seconds +/** + * @constant {number} TRIPLE_STORE_QUEUE_LIMIT + * - Triple store queue limit + */ +exports.TRIPLE_STORE_QUEUE_LIMIT = 5000; + /** * @constant {number} BLOCKCHAIN_QUEUE_LIMIT * - Blockchain queue limit diff --git a/modules/service/data-service.js b/modules/service/data-service.js index 28bfa4e4a6..6f3bd59f71 100644 --- a/modules/service/data-service.js +++ b/modules/service/data-service.js @@ -1,4 +1,4 @@ -const {v1: uuidv1} = require('uuid'); +const { v1: uuidv1 } = require('uuid'); const N3 = require('n3'); const constants = require('../constants'); const GraphDB = require('../../external/graphdb-service'); @@ -14,8 +14,8 @@ class DataService { this.nodeService = ctx.nodeService; this.workerPool = ctx.workerPool; this.blockchainService = ctx.blockchainService; - this.N3Parser = new N3.Parser({format: 'N-Triples', baseIRI: 'http://schema.org/'}); - + this.tripleStoreQueue = ctx.tripleStoreQueue.promise(this, this.handleTripleStoreRequest, 1); + this.N3Parser = new N3.Parser({ format: 'N-Triples', baseIRI: 'http://schema.org/' }); } getName() { @@ -100,7 +100,8 @@ class DataService { async insert(data, assertionId) { try { - return this.implementation.insert(data, assertionId); + const result = await this.tripleStoreQueue.push({ operation: 'insert', data, assertionId }); + return result; } catch (e) { // TODO: Check situation when inserting data recieved from other node this.handleUnavailableTripleStoreError(e); @@ -109,7 +110,7 @@ class DataService { async resolve(id, localQuery = false, metadataOnly = false) { try { - let nquads = await this.implementation.resolve(id); + let nquads = await this.tripleStoreQueue.push({ operation: 'resolve', id }); // TODO: add function for this conditional expr for increased readability if (!localQuery && nquads && nquads.find((x) => x.includes(`<${constants.DID_PREFIX}:${id}> "private" .`))) { @@ -126,12 +127,12 @@ class DataService { async assertionsByAsset(id) { try { - let assertions = await this.implementation.assertionsByAsset(id); + const assertions = await this.tripleStoreQueue.push({ operation: 'assertionsByAsset', id }); - return assertions.map(x => ({ + return assertions.map((x) => ({ id: x.assertionId.value.slice(8), issuer: x.issuer.value, - timestamp: x.timestamp.value + timestamp: x.timestamp.value, })); } catch (e) { this.handleUnavailableTripleStoreError(e); @@ -240,7 +241,7 @@ class DataService { async searchByQuery(query, options, localQuery = false) { try { - const assertions = await this.implementation.findAssetsByKeyword(query, options, localQuery); + const assertions = await this.tripleStoreQueue.push({ operation: 'findAssetsByKeyword', query, options, localQuery }); if (!assertions) return null; const result = []; @@ -298,7 +299,7 @@ class DataService { async searchAssertions(query, options, localQuery = false) { try { - const assertions = await this.implementation.findAssertionsByKeyword(query, options, localQuery); + const assertions = await this.tripleStoreQueue.push({ operation: 'findAssertionsByKeyword', query, options, localQuery }); if (!assertions) return null; const result = []; @@ -343,7 +344,7 @@ class DataService { try { let assertions = []; for (const nquad of nquads) { - const result = await this.implementation.findAssertions(nquad); + const result = await this.tripleStoreQueue.push({ operation: 'findAssertions', nquad }); assertions = [...new Set(assertions.concat(result))]; } @@ -364,23 +365,23 @@ class DataService { }); try { switch (type) { - // case 'SELECT': - // result = this.implementation.execute(query); - // break; - case 'CONSTRUCT': - result = await this.implementation.construct(query); - result = result.toString(); - if (result) { - result = result.split('\n').filter((x) => x !== ''); - } else { - result = []; - } - break; - // case 'ASK': - // result = this.implementation.ask(query); - // break; - default: - throw Error('Query type not supported'); + // case 'SELECT': + // result = this.implementation.execute(query); + // break; + case 'CONSTRUCT': + result = await this.tripleStoreQueue.push({ operation: 'construct', query }); + result = result.toString(); + if (result) { + result = result.split('\n').filter((x) => x !== ''); + } else { + result = []; + } + break; + // case 'ASK': + // result = this.implementation.ask(query); + // break; + default: + throw Error('Query type not supported'); } const quads = []; await this.N3Parser.parse( @@ -422,42 +423,42 @@ class DataService { let frame; switch (type.toLowerCase()) { - case this.constants.GS1EPCIS: - context = { - '@context': [ - 'https://gs1.github.io/EPCIS/epcis-context.jsonld', - { - example: 'http://ns.example.com/epcis/', - }, - ], - }; - - frame = { - '@context': [ - 'https://gs1.github.io/EPCIS/epcis-context.jsonld', - { - example: 'http://ns.example.com/epcis/', - }, - ], - isA: 'EPCISDocument', - }; - break; - case this.constants.ERC721: - case this.constants.OTTELEMETRY: - context = { - "@context": "https://www.schema.org/" - } - frame = { - "@context": "https://www.schema.org/", - "@type": type - } - break; - default: - context = { - '@context': ['https://www.schema.org/'], - }; + case this.constants.GS1EPCIS: + context = { + '@context': [ + 'https://gs1.github.io/EPCIS/epcis-context.jsonld', + { + example: 'http://ns.example.com/epcis/', + }, + ], + }; - frame = {}; + frame = { + '@context': [ + 'https://gs1.github.io/EPCIS/epcis-context.jsonld', + { + example: 'http://ns.example.com/epcis/', + }, + ], + isA: 'EPCISDocument', + }; + break; + case this.constants.ERC721: + case this.constants.OTTELEMETRY: + context = { + "@context": "https://www.schema.org/" + }; + frame = { + "@context": "https://www.schema.org/", + "@type": type + }; + break; + default: + context = { + '@context': ['https://www.schema.org/'], + }; + + frame = {}; } const json = await this.workerPool.exec('fromNQuads', [nquads, context, frame]) @@ -498,8 +499,9 @@ class DataService { hasKeywords: assertion.metadata.keywords, }; - if (assertion.metadata.UALs) + if (assertion.metadata.UALs) { metadata.hasUALs = assertion.metadata.UALs; + } const result = await this.workerPool.exec('toNQuads', [metadata]); return result; @@ -518,7 +520,7 @@ class DataService { async extractMetadata(rdf) { return new Promise(async (accept, reject) => { - const parser = new N3.Parser({format: 'N-Triples', baseIRI: 'http://schema.org/'}); + const parser = new N3.Parser({ format: 'N-Triples', baseIRI: 'http://schema.org/' }); const result = { metadata: { keywords: [], @@ -600,6 +602,41 @@ class DataService { }); } + async handleTripleStoreRequest(args) { + if (this.tripleStoreQueue.length() > constants.TRIPLE_STORE_QUEUE_LIMIT) { + throw new Error('Triple store queue is full'); + } + const { operation } = args; + let result; + switch (operation) { + case 'insert': + result = await this.implementation.insert(args.data, args.assertionId); + break; + case 'resolve': + result = await this.implementation.resolve(args.id); + break; + case 'assertionsByAsset': + result = await this.implementation.assertionsByAsset(args.id); + break; + case 'findAssetsByKeyword': + result = await this.implementation.findAssetsByKeyword(args.query, args.options, args.localQuery); + break; + case 'findAssertionsByKeyword': + result = await this.implementation.findAssertionsByKeyword(args.query, args.options, args.localQuery); + break; + case 'construct': + result = await this.implementation.construct(args.query); + break; + case 'findAssertions': + result = await this.implementation.findAssertions(args.nquad); + break; + default: + throw new Error('Unknown operation for triple store'); + } + + return result; + } + handleUnavailableTripleStoreError(e) { if (e.code === 'ECONNREFUSED') { this.logger.error({ diff --git a/ot-node.js b/ot-node.js index 9d5dec8af7..a91ea3b923 100644 --- a/ot-node.js +++ b/ot-node.js @@ -3,14 +3,13 @@ const DeepExtend = require('deep-extend'); const AutoGitUpdate = require('auto-git-update'); const rc = require('rc'); const fs = require('fs'); +const queue = require('fastq'); const DependencyInjection = require('./modules/service/dependency-injection'); const Logger = require('./modules/logger/logger'); const constants = require('./modules/constants'); const db = require('./models'); const pjson = require('./package.json'); const configjson = require('./config/config.json'); -const queue = require('fastq') - class OTNode { constructor(config) { @@ -67,6 +66,7 @@ class OTNode { DependencyInjection.registerValue(this.container, 'logger', this.logger); DependencyInjection.registerValue(this.container, 'constants', constants); DependencyInjection.registerValue(this.container, 'blockchainQueue', queue); + DependencyInjection.registerValue(this.container, 'tripleStoreQueue', queue); this.logger.info('Dependency injection module is initialized'); } From 339f054ae1892035a986e00f89e8015d3e1805ba Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Tue, 8 Mar 2022 16:32:36 +0100 Subject: [PATCH 09/14] add express slow down and increase dialTimeout --- external/libp2p-service.js | 2 +- modules/controller/rpc-controller.js | 11 +++++++++-- package.json | 1 + 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/external/libp2p-service.js b/external/libp2p-service.js index 1b49bfa452..dc0101cdb0 100644 --- a/external/libp2p-service.js +++ b/external/libp2p-service.js @@ -27,7 +27,7 @@ const initializationObject = { dht: KadDHT, }, dialer: { - dialTimeout: 1e3, + dialTimeout: 2e3, }, config: { dht: { diff --git a/modules/controller/rpc-controller.js b/modules/controller/rpc-controller.js index dfc24dea7c..ec9992906b 100644 --- a/modules/controller/rpc-controller.js +++ b/modules/controller/rpc-controller.js @@ -8,6 +8,7 @@ const path = require('path'); const { v1: uuidv1, v4: uuidv4 } = require('uuid'); const sortedStringify = require('json-stable-stringify'); const validator = require('validator'); +const slowDown = require('express-slow-down'); const Models = require('../../models/index'); const constants = require('../constants'); const pjson = require('../../package.json'); @@ -74,7 +75,7 @@ class RpcController { this.app.use((error, req, res, next) => { if (error instanceof IpDeniedError) { - return res.status(401).send('Access denied') + return res.status(401).send('Access denied'); } return next(); }); @@ -82,7 +83,13 @@ class RpcController { this.app.use((req, res, next) => { this.logger.info(`${req.method}: ${req.url} request received`); return next(); - }) + }); + + this.app.use(slowDown({ + windowMs: 1 * 60 * 1000, // 1 minute + delayAfter: 30, // allow 30 requests per 1 minute, then... + delayMs: 2 * 1000, // begin adding 2s of delay per request above 30; + })); } async initializeErrorMiddleware() { diff --git a/package.json b/package.json index 21565a5db3..ac4be0ddfc 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ "express": "^4.17.1", "express-fileupload": "^1.2.1", "express-ipfilter": "^1.2.0", + "express-slow-down": "^1.4.0", "fast-sort": "^3.1.1", "fastq": "^1.13.0", "fs-extra": "^10.0.0", From 9a9e85fa8e0b9c8a43d60148895cb806c1174d85 Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Tue, 8 Mar 2022 16:39:23 +0100 Subject: [PATCH 10/14] removed unnecessary sleep function --- modules/service/publish-service.js | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/service/publish-service.js b/modules/service/publish-service.js index 00bead61c1..e0b88b7ee1 100644 --- a/modules/service/publish-service.js +++ b/modules/service/publish-service.js @@ -1,4 +1,5 @@ const { v1: uuidv1 } = require('uuid'); +const sleep = require('sleep-async')().Promise; const constants = require('../constants'); class PublishService { @@ -111,7 +112,7 @@ class PublishService { && retries < constants.STORE_MAX_RETRIES ) { retries += 1; - await this.sleepForMilliseconds(constants.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS); + await sleep.sleep(constants.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS); response = await this.networkService.sendMessage('/store', assertion, node); } @@ -146,10 +147,6 @@ class PublishService { return status; } - - async sleepForMilliseconds(milliseconds) { - await new Promise((r) => setTimeout(r, milliseconds)); - } } module.exports = PublishService; From 1997a722bbbb0428ffb5c9710429556945c89250 Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Tue, 8 Mar 2022 17:21:49 +0100 Subject: [PATCH 11/14] add busy messages in handleStore --- modules/constants.js | 6 ++++++ modules/service/data-service.js | 4 ++++ modules/service/publish-service.js | 3 +++ 3 files changed, 13 insertions(+) diff --git a/modules/constants.js b/modules/constants.js index c98108fff1..0273898748 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -93,6 +93,12 @@ exports.STORE_MAX_RETRIES = 3; */ exports.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS = 2 * 1000; +/** + * @constant {number} HANDLE_STORE_BUSINESS_LIMIT + * - Max number of operations in triple store queue that indicate business + */ +exports.HANDLE_STORE_BUSINESS_LIMIT = 20; + /** * @constant {object} TRIPLE_STORE_IMPLEMENTATION - * Names of available triple store implementations diff --git a/modules/service/data-service.js b/modules/service/data-service.js index 6f3bd59f71..cd5ee29cb9 100644 --- a/modules/service/data-service.js +++ b/modules/service/data-service.js @@ -18,6 +18,10 @@ class DataService { this.N3Parser = new N3.Parser({ format: 'N-Triples', baseIRI: 'http://schema.org/' }); } + getTripleStoreQueueLength() { + return this.tripleStoreQueue.length(); + } + getName() { return this.implementation.getName(); } diff --git a/modules/service/publish-service.js b/modules/service/publish-service.js index e0b88b7ee1..3288d646f0 100644 --- a/modules/service/publish-service.js +++ b/modules/service/publish-service.js @@ -121,6 +121,9 @@ class PublishService { async handleStore(data) { if (!data || data.rdf) return false; + if (this.dataService.getTripleStoreQueueLength() > constants.HANDLE_STORE_BUSINESS_LIMIT) { + return constants.NETWORK_RESPONSES.BUSY; + } const operationId = uuidv1(); this.logger.emit({ msg: 'Started measuring execution of handle store command', From 30dc6f6630874d74980bc73e175da555f4553783 Mon Sep 17 00:00:00 2001 From: NZT48 Date: Tue, 8 Mar 2022 18:23:04 +0100 Subject: [PATCH 12/14] Bump the version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index ac4be0ddfc..cef55ee28a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.0.0-beta.1.28", + "version": "6.0.0-beta.1.29", "description": "OTNode v6 Beta 1", "main": "index.js", "scripts": { From 8cf6e47b19b3eab7ec161897c4f52dd5944bce86 Mon Sep 17 00:00:00 2001 From: NZT48 Date: Wed, 9 Mar 2022 14:02:00 +0100 Subject: [PATCH 13/14] Fix measurment and error handling for publish operation --- .../command/publish/send-assertion-command.js | 17 +++++++ modules/constants.js | 1 + modules/service/publish-service.js | 46 +++++++++++++------ 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/modules/command/publish/send-assertion-command.js b/modules/command/publish/send-assertion-command.js index b60fb57afe..adcdbf13a3 100644 --- a/modules/command/publish/send-assertion-command.js +++ b/modules/command/publish/send-assertion-command.js @@ -46,6 +46,23 @@ class SendAssertionCommand extends Command { const storePromises = nodes.map((node) => this.publishService .store({ id: assertion.id, nquads }, node) + .then((response) => { + if (!response) { + this.logger.error({ + msg: `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String} - receiving node didn't stored the assertion.`, + Operation_name: 'Error', + Event_name: constants.ERROR_TYPE.SEND_ASSERTION_ERROR, + Id_operation: handlerId, + }); + } else if (response === 'busy') { + this.logger.error({ + msg: `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String} - receiving node is busy to store.`, + Operation_name: 'Error', + Event_name: constants.ERROR_TYPE.SEND_ASSERTION_ERROR, + Id_operation: handlerId, + }); + } + }) .catch((e) => { this.handleError( handlerId, diff --git a/modules/constants.js b/modules/constants.js index 0273898748..758e1d38b7 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -139,6 +139,7 @@ exports.ERROR_TYPE = { PROOFS_ROUTE_ERROR: 'ProofsRouteError', RESULTS_ROUTE_ERROR: 'ResultsRouteError', NODE_INFO_ROUTE_ERROR: 'NodeInfoRouteError', + HANDLE_STORE_ERROR: 'HandleStoreError', EXTRACT_METADATA_ERROR: 'ExtractMetadataError', TRIPLE_STORE_UNAVAILABLE_ERROR: 'TripleStoreUnavailableError', TRIPLE_STORE_INSERT_ERROR: 'TripleStoreInsertError', diff --git a/modules/service/publish-service.js b/modules/service/publish-service.js index 3288d646f0..344914f3af 100644 --- a/modules/service/publish-service.js +++ b/modules/service/publish-service.js @@ -86,7 +86,7 @@ class PublishService { sequence: commandSequence.slice(1), delay: 0, data: { - documentPath, handlerId, method, isTelemetry, + documentPath, handlerId, method, isTelemetry, operationId, }, transactional: false, }); @@ -132,23 +132,39 @@ class PublishService { Id_operation: operationId, }); - const { jsonld, nquads } = await this.dataService.createAssertion(data.nquads); - const status = await this.dataService.verifyAssertion(jsonld, nquads); + try { + const { jsonld, nquads } = await this.dataService.createAssertion(data.nquads); + const status = await this.dataService.verifyAssertion(jsonld, nquads); - // todo check root hash on the blockchain - if (status) { - await this.dataService.insert(data.nquads.join('\n'), `${constants.DID_PREFIX}:${data.id}`); - this.logger.info(`Assertion ${data.id} has been successfully inserted`); - } + // todo check root hash on the blockchain + if (status) { + await this.dataService.insert(data.nquads.join('\n'), `${constants.DID_PREFIX}:${data.id}`); + this.logger.info(`Assertion ${data.id} has been successfully inserted`); + } - this.logger.emit({ - msg: 'Finished measuring execution of handle store command', - Event_name: 'handle_store_end', - Operation_name: 'handle_store', - Id_operation: operationId, - }); + this.logger.emit({ + msg: 'Finished measuring execution of handle store command', + Event_name: 'handle_store_end', + Operation_name: 'handle_store', + Id_operation: operationId, + }); - return status; + return status; + } catch (e) { + this.logger.emit({ + msg: 'Finished measuring execution of handle store command', + Event_name: 'handle_store_end', + Operation_name: 'handle_store', + Id_operation: operationId, + }); + this.logger.error({ + msg: `Error while handling store: ${e} - ${e.stack}`, + Operation_name: 'Error', + Event_name: constants.ERROR_TYPE.HANDLE_STORE_ERROR, + Id_operation: operationId, + }); + return false; + } } } From 0c29d4ee8b5e61b317c5021219ca5cc73fcc654c Mon Sep 17 00:00:00 2001 From: NZT48 Date: Wed, 9 Mar 2022 14:05:51 +0100 Subject: [PATCH 14/14] Increase resolve timeout to 15s --- modules/constants.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/constants.js b/modules/constants.js index 758e1d38b7..4bdf852703 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -79,7 +79,7 @@ exports.BLOCKCHAIN_QUEUE_LIMIT = 25000; * @constant {number} RESOLVE_MAX_TIME_MILLIS * - Maximum time for resolve operation */ -exports.RESOLVE_MAX_TIME_MILLIS = 3 * 1000; +exports.RESOLVE_MAX_TIME_MILLIS = 15 * 1000; /** * @constant {number} STORE_MAX_RETRIES