diff --git a/external/blazegraph-service.js b/external/blazegraph-service.js
index 5f3fcd214c..e86d5e7312 100644
--- a/external/blazegraph-service.js
+++ b/external/blazegraph-service.js
@@ -102,7 +102,6 @@ class BlazegraphService {
}
async resolve(uri) {
- let isAsset = false;
const query = `PREFIX schema:
CONSTRUCT { ?s ?p ?o }
WHERE {
@@ -112,26 +111,6 @@ class BlazegraphService {
}`;
let nquads = await this.construct(query);
- if (!nquads.length) {
- const query = `PREFIX schema:
- CONSTRUCT { ?s ?p ?o }
- WHERE {
- GRAPH ?g { ?s ?p ?o }
- {
- SELECT ?ng
- WHERE {
- ?ng schema:hasUALs "${uri}" ;
- schema:hasTimestamp ?timestamp .
- }
- ORDER BY DESC(?timestamp)
- LIMIT 1
- }
- FILTER (?g = ?ng) .
- }`;
- nquads = await this.construct(query);
- isAsset = true;
- }
-
if (nquads.length) {
nquads = nquads.toString();
nquads = nquads.split('\n');
@@ -140,7 +119,7 @@ class BlazegraphService {
} else {
nquads = null;
}
- return { nquads, isAsset };
+ return nquads;
}
async transformBlankNodes(nquads) {
diff --git a/external/graphdb-service.js b/external/graphdb-service.js
index 75a633c605..54053fa897 100644
--- a/external/graphdb-service.js
+++ b/external/graphdb-service.js
@@ -117,7 +117,6 @@ class GraphdbService {
}
async resolve(uri) {
- let isAsset = false;
const query = `PREFIX schema:
CONSTRUCT { ?s ?p ?o }
WHERE {
@@ -127,26 +126,6 @@ class GraphdbService {
}`;
let nquads = await this.construct(query);
- if (!nquads.length) {
- const query = `PREFIX schema:
- CONSTRUCT { ?s ?p ?o }
- WHERE {
- GRAPH ?g { ?s ?p ?o }
- {
- SELECT ?ng
- WHERE {
- ?ng schema:hasUALs "${uri}" ;
- schema:hasTimestamp ?timestamp .
- }
- ORDER BY DESC(?timestamp)
- LIMIT 1
- }
- FILTER (?g = ?ng) .
- }`;
- nquads = await this.construct(query);
- isAsset = true;
- }
-
if (nquads.length) {
nquads = nquads.toString();
nquads = nquads.replace(/_:genid(.){37}/gm, '_:$1');
@@ -155,10 +134,9 @@ class GraphdbService {
} else {
nquads = null;
}
- return { nquads, isAsset };
+ return nquads;
}
-
async assertionsByAsset(uri) {
const query = `PREFIX schema:
SELECT ?assertionId ?issuer ?timestamp
@@ -168,7 +146,7 @@ class GraphdbService {
schema:hasIssuer ?issuer .
}
ORDER BY DESC(?timestamp)`;
- let result = await this.execute(query);
+ const result = await this.execute(query);
return JSON.parse(result).results.bindings;
}
diff --git a/external/libp2p-service.js b/external/libp2p-service.js
index 8e1047fd78..dc0101cdb0 100644
--- a/external/libp2p-service.js
+++ b/external/libp2p-service.js
@@ -27,7 +27,7 @@ const initializationObject = {
dht: KadDHT,
},
dialer: {
- dialTimeout: 1e3,
+ dialTimeout: 2e3,
},
config: {
dht: {
@@ -180,7 +180,7 @@ class Libp2pService {
)
} else {
await pipe(
- ['ack'],
+ [constants.NETWORK_RESPONSES.ACK],
stream
)
@@ -199,7 +199,7 @@ class Libp2pService {
Event_name: constants.ERROR_TYPE.LIBP2P_HANDLE_MSG_ERROR,
});
await pipe(
- ['ack'],
+ [constants.NETWORK_RESPONSES.ACK],
stream
)
@@ -224,7 +224,7 @@ class Libp2pService {
},
)
- if(response.toString() === 'ack') {
+ if(response.toString() === constants.NETWORK_RESPONSES.ACK) {
return null;
}
diff --git a/modules/command/publish/send-assertion-command.js b/modules/command/publish/send-assertion-command.js
index 89eb27f301..adcdbf13a3 100644
--- a/modules/command/publish/send-assertion-command.js
+++ b/modules/command/publish/send-assertion-command.js
@@ -44,11 +44,34 @@ class SendAssertionCommand extends Command {
}
nodes = [...new Set(nodes)];
- for (const node of nodes) {
- this.publishService.store({ id: assertion.id, nquads: nquads }, node).catch((e) => {
- this.handleError(handlerId, e, `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String}. Error message: ${e.message}. ${e.stack}`);
- });
- }
+ const storePromises = nodes.map((node) => this.publishService
+ .store({ id: assertion.id, nquads }, node)
+ .then((response) => {
+ if (!response) {
+ this.logger.error({
+ msg: `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String} - receiving node didn't stored the assertion.`,
+ Operation_name: 'Error',
+ Event_name: constants.ERROR_TYPE.SEND_ASSERTION_ERROR,
+ Id_operation: handlerId,
+ });
+ } else if (response === 'busy') {
+ this.logger.error({
+ msg: `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String} - receiving node is busy to store.`,
+ Operation_name: 'Error',
+ Event_name: constants.ERROR_TYPE.SEND_ASSERTION_ERROR,
+ Id_operation: handlerId,
+ });
+ }
+ })
+ .catch((e) => {
+ this.handleError(
+ handlerId,
+ e,
+ `Error while sending data with assertion id ${assertion.id} to node ${node._idB58String}. Error message: ${e.message}. ${e.stack}`,
+ );
+ }));
+
+ await Promise.all(storePromises);
await Models.handler_ids.update(
{
diff --git a/modules/constants.js b/modules/constants.js
index b5dcd64194..4bdf852703 100644
--- a/modules/constants.js
+++ b/modules/constants.js
@@ -63,12 +63,42 @@ exports.TRIPLE_STORE_CONNECT_MAX_RETRIES = 10;
*/
exports.TRIPLE_STORE_CONNECT_RETRY_FREQUENCY = 10; // 10 seconds
+/**
+ * @constant {number} TRIPLE_STORE_QUEUE_LIMIT
+ * - Triple store queue limit
+ */
+exports.TRIPLE_STORE_QUEUE_LIMIT = 5000;
+
/**
* @constant {number} BLOCKCHAIN_QUEUE_LIMIT
* - Blockchain queue limit
*/
exports.BLOCKCHAIN_QUEUE_LIMIT = 25000;
+/**
+ * @constant {number} RESOLVE_MAX_TIME_MILLIS
+ * - Maximum time for resolve operation
+ */
+exports.RESOLVE_MAX_TIME_MILLIS = 15 * 1000;
+
+/**
+ * @constant {number} STORE_MAX_RETRIES
+ * - Maximum number of retries
+ */
+exports.STORE_MAX_RETRIES = 3;
+
+/**
+ * @constant {number} STORE_BUSY_REPEAT_INTERVAL_IN_MILLS
+ * - Wait interval between retries for sending store requests
+ */
+exports.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS = 2 * 1000;
+
+/**
+ * @constant {number} HANDLE_STORE_BUSINESS_LIMIT
+ * - Max number of operations in triple store queue that indicate business
+ */
+exports.HANDLE_STORE_BUSINESS_LIMIT = 20;
+
/**
* @constant {object} TRIPLE_STORE_IMPLEMENTATION -
* Names of available triple store implementations
@@ -78,6 +108,17 @@ exports.TRIPLE_STORE_IMPLEMENTATION = {
GRAPHDB: 'GraphDB',
};
+/**
+ * @constant {object} NETWORK_RESPONSES -
+ * Types of known network responses
+ */
+exports.NETWORK_RESPONSES = {
+ TRUE: true,
+ FALSE: false,
+ ACK: 'ack',
+ BUSY: 'busy',
+};
+
/**
* @constant {object} ERROR_TYPE -
* Types of errors supported
@@ -98,6 +139,7 @@ exports.ERROR_TYPE = {
PROOFS_ROUTE_ERROR: 'ProofsRouteError',
RESULTS_ROUTE_ERROR: 'ResultsRouteError',
NODE_INFO_ROUTE_ERROR: 'NodeInfoRouteError',
+ HANDLE_STORE_ERROR: 'HandleStoreError',
EXTRACT_METADATA_ERROR: 'ExtractMetadataError',
TRIPLE_STORE_UNAVAILABLE_ERROR: 'TripleStoreUnavailableError',
TRIPLE_STORE_INSERT_ERROR: 'TripleStoreInsertError',
diff --git a/modules/controller/rpc-controller.js b/modules/controller/rpc-controller.js
index a9c13c4c30..ec9992906b 100644
--- a/modules/controller/rpc-controller.js
+++ b/modules/controller/rpc-controller.js
@@ -8,6 +8,7 @@ const path = require('path');
const { v1: uuidv1, v4: uuidv4 } = require('uuid');
const sortedStringify = require('json-stable-stringify');
const validator = require('validator');
+const slowDown = require('express-slow-down');
const Models = require('../../models/index');
const constants = require('../constants');
const pjson = require('../../package.json');
@@ -74,7 +75,7 @@ class RpcController {
this.app.use((error, req, res, next) => {
if (error instanceof IpDeniedError) {
- return res.status(401).send('Access denied')
+ return res.status(401).send('Access denied');
}
return next();
});
@@ -82,7 +83,13 @@ class RpcController {
this.app.use((req, res, next) => {
this.logger.info(`${req.method}: ${req.url} request received`);
return next();
- })
+ });
+
+ this.app.use(slowDown({
+ windowMs: 1 * 60 * 1000, // 1 minute
+ delayAfter: 30, // allow 30 requests per 1 minute, then...
+ delayMs: 2 * 1000, // begin adding 2s of delay per request above 30;
+ }));
}
async initializeErrorMiddleware() {
@@ -203,9 +210,8 @@ class RpcController {
isAsset = true;
id = assertionId;
}
- const result = await this.dataService.resolve(id, true);
- if (result && result.nquads) {
- let {nquads} = result;
+ const nquads = await this.dataService.resolve(id, true);
+ if (nquads) {
let assertion = await this.dataService.createAssertion(nquads);
assertion.jsonld.metadata = JSON.parse(sortedStringify(assertion.jsonld.metadata))
assertion.jsonld.data = JSON.parse(sortedStringify(await this.dataService.fromNQuads(assertion.jsonld.data, assertion.jsonld.metadata.type)))
@@ -235,9 +241,8 @@ class RpcController {
nodes = [...new Set(nodes)];
for (const node of nodes) {
try {
- const result = await this.queryService.resolve(id, req.query.load, isAsset, node);
- if (result) {
- const {assertion} = result;
+ const assertion = await this.queryService.resolve(id, req.query.load, isAsset, node);
+ if (assertion) {
assertion.jsonld.metadata = JSON.parse(sortedStringify(assertion.jsonld.metadata))
assertion.jsonld.data = JSON.parse(sortedStringify(await this.dataService.fromNQuads(assertion.jsonld.data, assertion.jsonld.metadata.type)))
response.push(isAsset ? {
@@ -590,9 +595,8 @@ class RpcController {
assertions = await this.dataService.findAssertions(reqNquads);
}
for (const assertionId of assertions) {
- const content = await this.dataService.resolve(assertionId);
- if (content) {
- const rawNquads = content.nquads ? content.nquads : content.rdf;
+ const rawNquads = await this.dataService.resolve(assertionId);
+ if (rawNquads) {
const { nquads } = await this.dataService.createAssertion(rawNquads);
const proofs = await this.validationService.getProofs(nquads, reqNquads);
result.push({ assertionId, proofs });
diff --git a/modules/service/data-service.js b/modules/service/data-service.js
index 5b196c37f3..cd5ee29cb9 100644
--- a/modules/service/data-service.js
+++ b/modules/service/data-service.js
@@ -1,4 +1,4 @@
-const {v1: uuidv1} = require('uuid');
+const { v1: uuidv1 } = require('uuid');
const N3 = require('n3');
const constants = require('../constants');
const GraphDB = require('../../external/graphdb-service');
@@ -14,8 +14,12 @@ class DataService {
this.nodeService = ctx.nodeService;
this.workerPool = ctx.workerPool;
this.blockchainService = ctx.blockchainService;
- this.N3Parser = new N3.Parser({format: 'N-Triples', baseIRI: 'http://schema.org/'});
+ this.tripleStoreQueue = ctx.tripleStoreQueue.promise(this, this.handleTripleStoreRequest, 1);
+ this.N3Parser = new N3.Parser({ format: 'N-Triples', baseIRI: 'http://schema.org/' });
+ }
+ getTripleStoreQueueLength() {
+ return this.tripleStoreQueue.length();
}
getName() {
@@ -100,7 +104,8 @@ class DataService {
async insert(data, assertionId) {
try {
- return this.implementation.insert(data, assertionId);
+ const result = await this.tripleStoreQueue.push({ operation: 'insert', data, assertionId });
+ return result;
} catch (e) {
// TODO: Check situation when inserting data recieved from other node
this.handleUnavailableTripleStoreError(e);
@@ -109,14 +114,16 @@ class DataService {
async resolve(id, localQuery = false, metadataOnly = false) {
try {
- let {nquads, isAsset} = await this.implementation.resolve(id);
+ let nquads = await this.tripleStoreQueue.push({ operation: 'resolve', id });
+
+ // TODO: add function for this conditional expr for increased readability
if (!localQuery && nquads && nquads.find((x) => x.includes(`<${constants.DID_PREFIX}:${id}> "private" .`))) {
return null;
}
if (metadataOnly) {
nquads = nquads.filter((x) => x.startsWith(`<${constants.DID_PREFIX}:${id}>`));
}
- return {nquads, isAsset};
+ return nquads;
} catch (e) {
this.handleUnavailableTripleStoreError(e);
}
@@ -124,12 +131,12 @@ class DataService {
async assertionsByAsset(id) {
try {
- let assertions = await this.implementation.assertionsByAsset(id);
+ const assertions = await this.tripleStoreQueue.push({ operation: 'assertionsByAsset', id });
- return assertions.map(x => ({
+ return assertions.map((x) => ({
id: x.assertionId.value.slice(8),
issuer: x.issuer.value,
- timestamp: x.timestamp.value
+ timestamp: x.timestamp.value,
}));
} catch (e) {
this.handleUnavailableTripleStoreError(e);
@@ -238,19 +245,19 @@ class DataService {
async searchByQuery(query, options, localQuery = false) {
try {
- const assertions = await this.implementation.findAssetsByKeyword(query, options, localQuery);
+ const assertions = await this.tripleStoreQueue.push({ operation: 'findAssetsByKeyword', query, options, localQuery });
if (!assertions) return null;
const result = [];
for (let assertion of assertions) {
const assertionId = assertion.assertionId = assertion.assertionId.value.replace(`${constants.DID_PREFIX}:`, '');
- assertion = await this.resolve(assertion.assertionId, localQuery, true);
- if (!assertion) continue;
+ const nquads = await this.resolve(assertion.assertionId, localQuery, true);
+ if (!nquads) continue;
if (localQuery) {
- assertion = await this.createAssertion(assertion.nquads);
+ assertion = await this.createAssertion(nquads);
let object = result.find((x) => x.type === assertion.jsonld.metadata.type && x.id === assertion.jsonld.metadata.UALs[0]);
if (!object) {
@@ -281,7 +288,7 @@ class DataService {
object = {
assertionId,
node: this.networkService.getPeerId(),
- nquads: assertion.nquads,
+ nquads,
};
result.push(object);
}
@@ -296,18 +303,18 @@ class DataService {
async searchAssertions(query, options, localQuery = false) {
try {
- const assertions = await this.implementation.findAssertionsByKeyword(query, options, localQuery);
+ const assertions = await this.tripleStoreQueue.push({ operation: 'findAssertionsByKeyword', query, options, localQuery });
if (!assertions) return null;
const result = [];
for (let assertion of assertions) {
const assertionId = assertion.assertionId = assertion.assertionId.value.replace(`${constants.DID_PREFIX}:`, '');
- assertion = await this.resolve(assertion.assertionId, localQuery, true);
- if (!assertion) continue;
+ const nquads = await this.resolve(assertion.assertionId, localQuery, true);
+ if (!nquads) continue;
if (localQuery) {
- assertion = await this.createAssertion(assertion.nquads);
+ assertion = await this.createAssertion(nquads);
let object = result.find((x) => x.id === assertion.id);
if (!object) {
object = {
@@ -324,7 +331,7 @@ class DataService {
object = {
assertionId,
node: this.networkService.getPeerId(),
- nquads: assertion.nquads,
+ nquads,
};
result.push(object);
}
@@ -341,7 +348,7 @@ class DataService {
try {
let assertions = [];
for (const nquad of nquads) {
- const result = await this.implementation.findAssertions(nquad);
+ const result = await this.tripleStoreQueue.push({ operation: 'findAssertions', nquad });
assertions = [...new Set(assertions.concat(result))];
}
@@ -362,23 +369,23 @@ class DataService {
});
try {
switch (type) {
- // case 'SELECT':
- // result = this.implementation.execute(query);
- // break;
- case 'CONSTRUCT':
- result = await this.implementation.construct(query);
- result = result.toString();
- if (result) {
- result = result.split('\n').filter((x) => x !== '');
- } else {
- result = [];
- }
- break;
- // case 'ASK':
- // result = this.implementation.ask(query);
- // break;
- default:
- throw Error('Query type not supported');
+ // case 'SELECT':
+ // result = this.implementation.execute(query);
+ // break;
+ case 'CONSTRUCT':
+ result = await this.tripleStoreQueue.push({ operation: 'construct', query });
+ result = result.toString();
+ if (result) {
+ result = result.split('\n').filter((x) => x !== '');
+ } else {
+ result = [];
+ }
+ break;
+ // case 'ASK':
+ // result = this.implementation.ask(query);
+ // break;
+ default:
+ throw Error('Query type not supported');
}
const quads = [];
await this.N3Parser.parse(
@@ -420,42 +427,42 @@ class DataService {
let
frame;
switch (type.toLowerCase()) {
- case this.constants.GS1EPCIS:
- context = {
- '@context': [
- 'https://gs1.github.io/EPCIS/epcis-context.jsonld',
- {
- example: 'http://ns.example.com/epcis/',
- },
- ],
- };
-
- frame = {
- '@context': [
- 'https://gs1.github.io/EPCIS/epcis-context.jsonld',
- {
- example: 'http://ns.example.com/epcis/',
- },
- ],
- isA: 'EPCISDocument',
- };
- break;
- case this.constants.ERC721:
- case this.constants.OTTELEMETRY:
- context = {
- "@context": "https://www.schema.org/"
- }
- frame = {
- "@context": "https://www.schema.org/",
- "@type": type
- }
- break;
- default:
- context = {
- '@context': ['https://www.schema.org/'],
- };
+ case this.constants.GS1EPCIS:
+ context = {
+ '@context': [
+ 'https://gs1.github.io/EPCIS/epcis-context.jsonld',
+ {
+ example: 'http://ns.example.com/epcis/',
+ },
+ ],
+ };
+
+ frame = {
+ '@context': [
+ 'https://gs1.github.io/EPCIS/epcis-context.jsonld',
+ {
+ example: 'http://ns.example.com/epcis/',
+ },
+ ],
+ isA: 'EPCISDocument',
+ };
+ break;
+ case this.constants.ERC721:
+ case this.constants.OTTELEMETRY:
+ context = {
+ "@context": "https://www.schema.org/"
+ };
+ frame = {
+ "@context": "https://www.schema.org/",
+ "@type": type
+ };
+ break;
+ default:
+ context = {
+ '@context': ['https://www.schema.org/'],
+ };
- frame = {};
+ frame = {};
}
const json = await this.workerPool.exec('fromNQuads', [nquads, context, frame])
@@ -496,8 +503,9 @@ class DataService {
hasKeywords: assertion.metadata.keywords,
};
- if (assertion.metadata.UALs)
+ if (assertion.metadata.UALs) {
metadata.hasUALs = assertion.metadata.UALs;
+ }
const result = await this.workerPool.exec('toNQuads', [metadata]);
return result;
@@ -516,7 +524,7 @@ class DataService {
async extractMetadata(rdf) {
return new Promise(async (accept, reject) => {
- const parser = new N3.Parser({format: 'N-Triples', baseIRI: 'http://schema.org/'});
+ const parser = new N3.Parser({ format: 'N-Triples', baseIRI: 'http://schema.org/' });
const result = {
metadata: {
keywords: [],
@@ -598,6 +606,41 @@ class DataService {
});
}
+ async handleTripleStoreRequest(args) {
+ if (this.tripleStoreQueue.length() > constants.TRIPLE_STORE_QUEUE_LIMIT) {
+ throw new Error('Triple store queue is full');
+ }
+ const { operation } = args;
+ let result;
+ switch (operation) {
+ case 'insert':
+ result = await this.implementation.insert(args.data, args.assertionId);
+ break;
+ case 'resolve':
+ result = await this.implementation.resolve(args.id);
+ break;
+ case 'assertionsByAsset':
+ result = await this.implementation.assertionsByAsset(args.id);
+ break;
+ case 'findAssetsByKeyword':
+ result = await this.implementation.findAssetsByKeyword(args.query, args.options, args.localQuery);
+ break;
+ case 'findAssertionsByKeyword':
+ result = await this.implementation.findAssertionsByKeyword(args.query, args.options, args.localQuery);
+ break;
+ case 'construct':
+ result = await this.implementation.construct(args.query);
+ break;
+ case 'findAssertions':
+ result = await this.implementation.findAssertions(args.nquad);
+ break;
+ default:
+ throw new Error('Unknown operation for triple store');
+ }
+
+ return result;
+ }
+
handleUnavailableTripleStoreError(e) {
if (e.code === 'ECONNREFUSED') {
this.logger.error({
diff --git a/modules/service/publish-service.js b/modules/service/publish-service.js
index 166b737623..344914f3af 100644
--- a/modules/service/publish-service.js
+++ b/modules/service/publish-service.js
@@ -1,4 +1,5 @@
const { v1: uuidv1 } = require('uuid');
+const sleep = require('sleep-async')().Promise;
const constants = require('../constants');
class PublishService {
@@ -85,7 +86,7 @@ class PublishService {
sequence: commandSequence.slice(1),
delay: 0,
data: {
- documentPath, handlerId, method, isTelemetry,
+ documentPath, handlerId, method, isTelemetry, operationId,
},
transactional: false,
});
@@ -104,11 +105,25 @@ class PublishService {
async store(assertion, node) {
// await this.networkService.store(node, topic, {});
- return await this.networkService.sendMessage('/store', assertion, node);
+ let retries = 0;
+ let response = await this.networkService.sendMessage('/store', assertion, node);
+ while (
+ response === constants.NETWORK_RESPONSES.BUSY
+ && retries < constants.STORE_MAX_RETRIES
+ ) {
+ retries += 1;
+ await sleep.sleep(constants.STORE_BUSY_REPEAT_INTERVAL_IN_MILLS);
+ response = await this.networkService.sendMessage('/store', assertion, node);
+ }
+
+ return response;
}
async handleStore(data) {
if (!data || data.rdf) return false;
+ if (this.dataService.getTripleStoreQueueLength() > constants.HANDLE_STORE_BUSINESS_LIMIT) {
+ return constants.NETWORK_RESPONSES.BUSY;
+ }
const operationId = uuidv1();
this.logger.emit({
msg: 'Started measuring execution of handle store command',
@@ -117,23 +132,39 @@ class PublishService {
Id_operation: operationId,
});
- const { jsonld, nquads } = await this.dataService.createAssertion(data.nquads);
- const status = await this.dataService.verifyAssertion(jsonld, nquads);
+ try {
+ const { jsonld, nquads } = await this.dataService.createAssertion(data.nquads);
+ const status = await this.dataService.verifyAssertion(jsonld, nquads);
- // todo check root hash on the blockchain
- if (status) {
- await this.dataService.insert(data.nquads.join('\n'), `${constants.DID_PREFIX}:${data.id}`);
- this.logger.info(`Assertion ${data.id} has been successfully inserted`);
- }
+ // todo check root hash on the blockchain
+ if (status) {
+ await this.dataService.insert(data.nquads.join('\n'), `${constants.DID_PREFIX}:${data.id}`);
+ this.logger.info(`Assertion ${data.id} has been successfully inserted`);
+ }
- this.logger.emit({
- msg: 'Finished measuring execution of handle store command',
- Event_name: 'handle_store_end',
- Operation_name: 'handle_store',
- Id_operation: operationId,
- });
+ this.logger.emit({
+ msg: 'Finished measuring execution of handle store command',
+ Event_name: 'handle_store_end',
+ Operation_name: 'handle_store',
+ Id_operation: operationId,
+ });
- return status;
+ return status;
+ } catch (e) {
+ this.logger.emit({
+ msg: 'Finished measuring execution of handle store command',
+ Event_name: 'handle_store_end',
+ Operation_name: 'handle_store',
+ Id_operation: operationId,
+ });
+ this.logger.error({
+ msg: `Error while handling store: ${e} - ${e.stack}`,
+ Operation_name: 'Error',
+ Event_name: constants.ERROR_TYPE.HANDLE_STORE_ERROR,
+ Id_operation: operationId,
+ });
+ return false;
+ }
}
}
diff --git a/modules/service/query-service.js b/modules/service/query-service.js
index 08f34cb82f..fbda2a3849 100644
--- a/modules/service/query-service.js
+++ b/modules/service/query-service.js
@@ -12,21 +12,34 @@ class QueryService {
}
async resolve(id, load, isAssetRequested, node) {
- let result = await this.networkService.sendMessage('/resolve', id, node);
- if (!result || (Array.isArray(result) && result[0] === "ack")) {
+ const resolvePromise = new Promise(async (resolve, reject) => {
+ const timer = setTimeout(() => {
+ resolve(null);
+ }, constants.RESOLVE_MAX_TIME_MILLIS);
+
+ const result = await this.networkService.sendMessage('/resolve', id, node);
+ clearTimeout(timer);
+ resolve(result);
+ });
+
+ const result = await resolvePromise;
+ if (!result || (Array.isArray(result) && result[0] === constants.NETWORK_RESPONSES.ACK)) {
return null;
}
- const { isAsset } = result;
const rawNquads = result.nquads ? result.nquads : result;
- let assertion = await this.dataService.createAssertion(rawNquads);
- const status = await this.dataService.verifyAssertion(assertion.jsonld, assertion.nquads, {isAsset: isAssetRequested});
+ const assertion = await this.dataService.createAssertion(rawNquads);
+ const status = await this.dataService.verifyAssertion(
+ assertion.jsonld,
+ assertion.nquads,
+ { isAsset: isAssetRequested },
+ );
if (status && load) {
await this.dataService.insert(rawNquads.join('\n'), `${constants.DID_PREFIX}:${assertion.jsonld.metadata.id}`);
this.logger.info(`Assertion ${assertion.jsonld.metadata.id} has been successfully inserted`);
}
- return status ? { assertion, isAsset } : null;
+ return status ? assertion : null;
}
async handleResolve(id) {
@@ -38,7 +51,7 @@ class QueryService {
Id_operation: operationId,
});
- const { nquads, isAsset } = await this.dataService.resolve(id);
+ const nquads = await this.dataService.resolve(id);
if (nquads) {
this.logger.info(`Number of n-quads retrieved from the database is ${nquads.length}`);
}
@@ -53,7 +66,7 @@ class QueryService {
if (!nquads) {
return null;
}
- return { nquads, isAsset };
+ return nquads;
}
async search(data, node) {
@@ -104,7 +117,7 @@ class QueryService {
}
const rawNquads = assertion.nquads ? assertion.nquads : assertion.rdf;
- const { jsonld, nquads } = await this.dataService.createAssertion(rawNquads);
+ const jsonld = await this.dataService.createAssertion(rawNquads);
let object = handlerData.find((x) => x.type === jsonld.metadata.type && x.id === jsonld.metadata.UALs[0])
if (!object) {
object = {
diff --git a/ot-node.js b/ot-node.js
index 9d5dec8af7..a91ea3b923 100644
--- a/ot-node.js
+++ b/ot-node.js
@@ -3,14 +3,13 @@ const DeepExtend = require('deep-extend');
const AutoGitUpdate = require('auto-git-update');
const rc = require('rc');
const fs = require('fs');
+const queue = require('fastq');
const DependencyInjection = require('./modules/service/dependency-injection');
const Logger = require('./modules/logger/logger');
const constants = require('./modules/constants');
const db = require('./models');
const pjson = require('./package.json');
const configjson = require('./config/config.json');
-const queue = require('fastq')
-
class OTNode {
constructor(config) {
@@ -67,6 +66,7 @@ class OTNode {
DependencyInjection.registerValue(this.container, 'logger', this.logger);
DependencyInjection.registerValue(this.container, 'constants', constants);
DependencyInjection.registerValue(this.container, 'blockchainQueue', queue);
+ DependencyInjection.registerValue(this.container, 'tripleStoreQueue', queue);
this.logger.info('Dependency injection module is initialized');
}
diff --git a/package.json b/package.json
index 21565a5db3..cef55ee28a 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
- "version": "6.0.0-beta.1.28",
+ "version": "6.0.0-beta.1.29",
"description": "OTNode v6 Beta 1",
"main": "index.js",
"scripts": {
@@ -54,6 +54,7 @@
"express": "^4.17.1",
"express-fileupload": "^1.2.1",
"express-ipfilter": "^1.2.0",
+ "express-slow-down": "^1.4.0",
"fast-sort": "^3.1.1",
"fastq": "^1.13.0",
"fs-extra": "^10.0.0",