diff --git a/.github/release-drafter.yml b/.github/release-drafter.yml new file mode 100644 index 0000000000..abd87b50b0 --- /dev/null +++ b/.github/release-drafter.yml @@ -0,0 +1,21 @@ +name-template: 'v$NEXT_PATCH_VERSION 🌈' +tag-template: 'v$NEXT_PATCH_VERSION' +categories: + - title: '🚀 Features' + labels: + - 'feature' + - 'enhancement' + - title: '🐛 Bug Fixes' + labels: + - 'fix' + - 'bugfix' + - 'bug' + - title: '🧰 Maintenance' + labels: + - 'chore' + - 'internal process' +change-template: '- $TITLE @$AUTHOR (#$NUMBER)' +template: | + ## Changes + + $CHANGES \ No newline at end of file diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml new file mode 100644 index 0000000000..cae6530020 --- /dev/null +++ b/.github/workflows/release-drafter.yml @@ -0,0 +1,17 @@ +name: Release Drafter + +on: + push: + # branches to consider in the event; optional, defaults to all + branches: + - develop + +jobs: + update_release_draft: + runs-on: ubuntu-latest + steps: + # Drafts your next Release notes as Pull Requests are merged into "master" + - uses: release-drafter/release-drafter@v5 + with: + # (Optional) specify config name to use, relative to .github/. Default: release-drafter.yml + # config-name: my-config.yml \ No newline at end of file diff --git a/modules/ImportUtilities.js b/modules/ImportUtilities.js index f13b78268d..198b45523d 100644 --- a/modules/ImportUtilities.js +++ b/modules/ImportUtilities.js @@ -574,6 +574,40 @@ class ImportUtilities { return header; } + + /** + * Extract Dataset creator identifier value from OT-JSON or graph header + * @static + * @param datasetHeader Header of the dataset in which the dataCreator field exists + * @returns String - Dataset creator identifier value (Currently ERC725 Identity) + */ + static getDataCreator(datasetHeader) { + return datasetHeader.dataCreator.identifiers[0].identifierValue; + } + + /** + * Process successfull import + * @static + * @param unpack Unpack keys + * @param objects Graph vertices and edges + * @return {Promise<>} + */ + static unpackKeysAndSortVertices(objects, unpack = false) { + let { + vertices, edges, + } = objects; + if (unpack) { + ImportUtilities.unpackKeys(vertices, edges); + } + + edges = Graph.sortVertices(edges); + vertices = Graph.sortVertices(vertices); + + return { + vertices, + edges, + }; + } } module.exports = ImportUtilities; diff --git a/modules/command/dc/dc-challenges-command.js b/modules/command/dc/dc-challenges-command.js index f63dda35c9..b19db57366 100644 --- a/modules/command/dc/dc-challenges-command.js +++ b/modules/command/dc/dc-challenges-command.js @@ -38,7 +38,6 @@ class DCChallengesCommand extends Command { offer_id: challenge.offer_id, }, }); - if (challenged.status !== 'HOLDING') { return; } @@ -46,7 +45,6 @@ class DCChallengesCommand extends Command { challenge.status = 'IN_PROGRESS'; await challenge.save({ fields: ['status'] }); - challenged.status = 'CHALLENGING'; await challenged.save({ fields: ['status'] }); diff --git a/modules/command/dc/dc-convert-to-graph-command.js b/modules/command/dc/dc-convert-to-graph-command.js index 6347e8e329..9291d02fac 100644 --- a/modules/command/dc/dc-convert-to-graph-command.js +++ b/modules/command/dc/dc-convert-to-graph-command.js @@ -22,6 +22,7 @@ class DcConvertToGraphCommand extends Command { data: { error: { message: error.message }, handler_id: command.data.handler_id, + documentPath: command.data.documentPath, }, }); } diff --git a/modules/command/dc/dc-convert-to-ot-json-command.js b/modules/command/dc/dc-convert-to-ot-json-command.js index 42d58d373a..3b5f3c297e 100644 --- a/modules/command/dc/dc-convert-to-ot-json-command.js +++ b/modules/command/dc/dc-convert-to-ot-json-command.js @@ -1,3 +1,5 @@ +const path = require('path'); +const fs = require('fs'); const Command = require('../command'); const ImportUtilities = require('../../ImportUtilities'); @@ -16,11 +18,17 @@ class DcConvertToOtJsonCommand extends Command { * @param command */ async execute(command) { - const { standard_id } = command.data; + const { standard_id, documentPath, handler_id } = command.data; try { if (standard_id === 'ot-json') { - command.data.document = JSON.parse(command.data.document); - if (!command.data.document.signature) { command.data.document = ImportUtilities.prepareDataset(command.data.document['@graph'], this.config, this.web3); } + let document = JSON.parse(fs.readFileSync(documentPath, { encoding: 'utf-8' })); + + if (!document.signature) { + document = ImportUtilities.prepareDataset(document['@graph'], this.config, this.web3); + } + + fs.writeFileSync(documentPath, JSON.stringify(document)); + return this.continueSequence(command.data, command.sequence); } await this.importWorkerController.startOtjsonConverterWorker(command, standard_id); @@ -31,7 +39,8 @@ class DcConvertToOtJsonCommand extends Command { transactional: false, data: { error: { message: error.message }, - handler_id: command.data.handler_id, + handler_id, + documentPath, }, }); } diff --git a/modules/command/dc/dc-finalize-import-command.js b/modules/command/dc/dc-finalize-import-command.js index fb13c3cfd7..cbc7ca4e89 100644 --- a/modules/command/dc/dc-finalize-import-command.js +++ b/modules/command/dc/dc-finalize-import-command.js @@ -1,10 +1,7 @@ +const fs = require('fs'); const Command = require('../command'); const Models = require('../../../models'); -const bytes = require('utf8-length'); const Utilities = require('../../Utilities'); -const { sha3_256 } = require('js-sha3'); -const ImportUtilities = require('../../ImportUtilities'); -const Graph = require('../../Graph'); class DcFinalizeImport extends Command { constructor(ctx) { @@ -20,38 +17,38 @@ class DcFinalizeImport extends Command { * @param command */ async execute(command) { - const { afterImportData, error } = command.data; - if (error) { - await this._processError(error, command.data.handler_id); - return Command.empty(); - } - const response = await this._unpackKeysAndSortVertices(afterImportData); - - const { - handler_id, otjson_size_in_bytes, total_documents, purchased, - } = afterImportData; const { + error, + handler_id, data_set_id, + data_provider_wallet, + purchased, + documentPath, root_hash, - wallet, // TODO: Sender's wallet is ignored for now. - vertices, - edges, - } = response; + data_hash, + otjson_size_in_bytes, + total_documents, + } = command.data; + + await Utilities.deleteDirectory(documentPath); + + if (error) { + await this._processError(error, handler_id, documentPath); + return Command.empty(); + } try { - const importTimestamp = new Date(); - const graphObject = {}; - Object.assign(graphObject, { vertices, edges }); - const dataHash = Utilities.normalizeHex(sha3_256(`${graphObject}`)); + const import_timestamp = new Date(); + this.remoteControl.importRequestData(); await Models.data_info.create({ data_set_id, root_hash, - data_provider_wallet: this.config.node_wallet, - import_timestamp: importTimestamp, + data_provider_wallet: data_provider_wallet || this.config.node_wallet, + import_timestamp, total_documents, origin: purchased ? 'PURCHASED' : 'IMPORTED', otjson_size_in_bytes, - data_hash: dataHash, + data_hash, }).catch(async (error) => { this.logger.error(error); this.notifyError(error); @@ -76,12 +73,10 @@ class DcFinalizeImport extends Command { status: 'COMPLETED', data: JSON.stringify({ dataset_id: data_set_id, - import_time: importTimestamp.valueOf(), + import_time: import_timestamp.valueOf(), otjson_size_in_bytes, root_hash, - data_hash: dataHash, - total_graph_entities: vertices.length - + edges.length, + data_hash, }), }, { @@ -90,6 +85,7 @@ class DcFinalizeImport extends Command { }, }, ); + this.logger.info('Import complete'); this.logger.info(`Root hash: ${root_hash}`); this.logger.info(`Data set ID: ${data_set_id}`); @@ -130,7 +126,7 @@ class DcFinalizeImport extends Command { return command; } - async _processError(error, handlerId) { + async _processError(error, handlerId, documentPath) { this.logger.error(error.message); await Models.handler_ids.update( { @@ -151,37 +147,6 @@ class DcFinalizeImport extends Command { this.notifyError(error); } } - - /** - * Process successfull import - * @param unpack Unpack keys - * @param result Import result - * @return {Promise<>} - */ - _unpackKeysAndSortVertices(result, unpack = false) { - this.remoteControl.importRequestData(); - const { - data_set_id, wallet, root_hash, - } = result; - let { - vertices, edges, - } = result; - if (unpack) { - ImportUtilities.unpackKeys(vertices, edges); - } - - edges = Graph.sortVertices(edges); - vertices = Graph.sortVertices(vertices); - - return { - data_set_id, - root_hash, - total_documents: edges.length + vertices.length, - vertices, - edges, - wallet, - }; - } } module.exports = DcFinalizeImport; diff --git a/modules/command/dc/dc-litigation-completed-command.js b/modules/command/dc/dc-litigation-completed-command.js index 51e820b48a..d80c9c080f 100644 --- a/modules/command/dc/dc-litigation-completed-command.js +++ b/modules/command/dc/dc-litigation-completed-command.js @@ -66,40 +66,12 @@ class DCLitigationCompletedCommand extends Command { }); this.logger.info(`Challenges removed for DH with identity ${dhIdentity} and offer ${offerId}.`); - const offer = await models.offers.findOne({ - where: { - offer_id: offerId, - }, - }); - - offer.global_status = 'REPLACEMENT_STARTED'; - await offer.save({ fields: ['global_status'] }); - this.remoteControl.offerUpdate({ - offer_id: offerId, - }); - await models.reputation_data.create({ dh_identity: dhIdentity, offer_id: offerId, reputation_delta: '-1', timestamp: Date.now(), }); - - return { - commands: [ - { - data: { - offerId, - dhIdentity, - }, - name: 'dcLitigationReplacementStartedCommand', - delay: 0, - period: 5000, - deadline_at: Date.now() + (5 * 60 * 1000), - transactional: false, - }, - ], - }; } const offer = await models.offers.findOne({ @@ -108,12 +80,17 @@ class DCLitigationCompletedCommand extends Command { }, }); - offer.global_status = 'ACTIVE'; - await offer.save({ fields: ['global_status'] }); - this.remoteControl.offerUpdate({ - offer_id: offerId, + const holdingCount = await models.replicated_data.count({ + where: { offer_id: offerId, status: 'HOLDING' }, }); - this.logger.important(`DH ${dhIdentity} has successfully answered litigation.`); + + if (holdingCount === 0) { + offer.global_status = 'FAILED'; + await offer.save({ fields: ['global_status'] }); + this.remoteControl.offerUpdate({ + offer_id: offerId, + }); + } return Command.empty(); } } diff --git a/modules/command/dc/dc-litigation-initiate-command.js b/modules/command/dc/dc-litigation-initiate-command.js index 5796f4a956..70a53e93e9 100644 --- a/modules/command/dc/dc-litigation-initiate-command.js +++ b/modules/command/dc/dc-litigation-initiate-command.js @@ -50,17 +50,23 @@ class DCLitigationInitiateCommand extends Command { return Command.empty(); } - if (offer.global_status !== 'ACTIVE') { + const replicatedData = await models.replicated_data.findOne({ + where: { offer_id: offerId, dh_identity: dhIdentity }, + }); + + if (replicatedData.status === 'PENALIZED') { + this.logger.trace(`Holder with id: ${dhIdentity} for offer ${offerId} was already penalized`); + return Command.empty(); + } + + if (replicatedData.status !== 'CHALLENGING') { // litigation or replacement is in progress this.logger.trace(`Litigation already in progress... It needs to be completed in order to litigate ${dhIdentity} for offer ${offerId}`); return Command.repeat(); // wait for offer to be active } - offer.global_status = 'LITIGATION_INITIATED'; - await offer.save(({ fields: ['global_status'] })); - this.remoteControl.offerUpdate({ - offer_id: offerId, - }); + replicatedData.status = 'LITIGATION_STARTED'; + await replicatedData.save({ fields: ['status'] }); const dcIdentity = utilities.normalizeHex(this.config.erc725Identity); const otJson = await this.importService.getImport(offer.data_set_id); diff --git a/modules/command/dc/dc-litigation-initiated-command.js b/modules/command/dc/dc-litigation-initiated-command.js index 64987774d5..e16fdef816 100644 --- a/modules/command/dc/dc-litigation-initiated-command.js +++ b/modules/command/dc/dc-litigation-initiated-command.js @@ -48,12 +48,6 @@ class DcLitigationInitiatedCommand extends Command { this.logger.important(`Litigation initiated for DH ${dhIdentity} and offer ${offerId}.`); - const replicatedData = await Models.replicated_data.findOne({ - where: { offer_id: offerId, dh_identity: dhIdentity }, - }); - replicatedData.status = 'LITIGATION_STARTED'; - await replicatedData.save({ fields: ['status'] }); - const offer = await Models.offers.findOne({ where: { offer_id: offerId }, }); diff --git a/modules/command/dc/dc-offer-finalized-command.js b/modules/command/dc/dc-offer-finalized-command.js index 14759fa075..492246acbd 100644 --- a/modules/command/dc/dc-offer-finalized-command.js +++ b/modules/command/dc/dc-offer-finalized-command.js @@ -152,7 +152,7 @@ class DcOfferFinalizedCommand extends Command { const encryptionColor = this.replicationService.castNumberToColor(replicatedData.color); const encryptedGraph = - this.replicationService.replicationCache[offer.id][encryptionColor].otJson['@graph']; + (await this.replicationService.loadReplication(offer.id, encryptionColor)).otJson['@graph']; const challenges = this.challengeService.generateChallenges( encryptedGraph, startTime, endTime, this.config.numberOfChallenges, diff --git a/modules/command/dc/dc-offer-prepare-command.js b/modules/command/dc/dc-offer-prepare-command.js index 19b4569e4c..769bf778d1 100644 --- a/modules/command/dc/dc-offer-prepare-command.js +++ b/modules/command/dc/dc-offer-prepare-command.js @@ -23,15 +23,7 @@ class DCOfferPrepareCommand extends Command { internalOfferId, } = command.data; - const colorsInfo = await this.replicationService.createReplications(internalOfferId); - const distLitRootHashes = (await Promise.all(colorsInfo.map(async (cInfo) => { - await this.replicationService.saveReplication(internalOfferId, cInfo.color, cInfo); - - const hashes = {}; - hashes[`${cInfo.color}LitigationHash`] = cInfo.litigationRootHash; - hashes[`${cInfo.color}DistributionHash`] = cInfo.distributionRootHash; - return hashes; - }))).reduce((acc, value) => Object.assign(acc, value)); + const distLitRootHashes = await this.replicationService.createReplications(internalOfferId); const { data } = command; Object.assign(data, distLitRootHashes); diff --git a/modules/command/dc/dc-write-import-to-graph-db-command.js b/modules/command/dc/dc-write-import-to-graph-db-command.js index 78a1981aa9..0b09409d35 100644 --- a/modules/command/dc/dc-write-import-to-graph-db-command.js +++ b/modules/command/dc/dc-write-import-to-graph-db-command.js @@ -1,6 +1,8 @@ +const fs = require('fs'); const Command = require('../command'); const { forEachSeries } = require('p-iteration'); const Utilities = require('../../Utilities'); +const ImportUtilities = require('../../ImportUtilities'); const { sha3_256 } = require('js-sha3'); class DcWriteImportToGraphDbCommand extends Command { @@ -17,17 +19,102 @@ class DcWriteImportToGraphDbCommand extends Command { * @param command */ async execute(command) { + this.logger.info('Importing data to database'); + const { + handler_id, documentPath, + } = command.data; try { - this.logger.info('Importing data to database'); - await this.writeToDb({ - data: command.data.dbData, + const { vertices, edges, metadata } = JSON.parse(fs.readFileSync(documentPath, { encoding: 'utf-8' })); + const dataCreator = ImportUtilities.getDataCreator(metadata.datasetHeader); + + await forEachSeries(vertices, vertex => this.graphStorage.addVertex(vertex)); + await forEachSeries(edges, edge => this.graphStorage.addEdge(edge)); + + await forEachSeries(vertices.filter(vertex => vertex.vertexType === 'Connector'), async (vertex) => { + const { identifierValue } = vertices.find(v => edges.filter(edge => edge._from === vertex._key && ['IDENTIFIED_BY'].includes(edge.relationType)).map(edge => edge._to).includes(v._key)); + const { data } = vertices.find(v => edges.filter(edge => edge._from === vertex._key && ['HAS_DATA'].includes(edge.relationType)).map(edge => edge._to).includes(v._key)); + // Connect to other connectors if available. + + const connectorIdentifierVertexKey = Utilities.keyFrom('id', identifierValue); + const relatedConnectors = + await this.graphStorage.findConnectors(connectorIdentifierVertexKey); + + await forEachSeries( + relatedConnectors.filter(v => v._key !== vertex._key), + async (relatedVertex) => { + let hasConnection1 = false; + if (relatedVertex.expectedConnectionCreators != null) { + relatedVertex.expectedConnectionCreators.forEach((expectedCreator) => { + const expectedErc725 = this._value(expectedCreator); + + if (dataCreator === expectedErc725) { + hasConnection1 = true; + } + }); + } + + let hasConnection2 = false; + await Promise.all(relatedVertex.datasets + .map(datasetId => new Promise(async (accept, reject) => { + try { + if (hasConnection2 === false) { + const metadata = await this.graphStorage + .findMetadataByImportId(datasetId); + + if (data.expectedConnectionCreators != null) { + data.expectedConnectionCreators + .forEach((expectedCreator) => { + const expectedErc725 = + this._value(expectedCreator); + + if (metadata && expectedErc725 === + metadata.datasetHeader.dataCreator + .identifiers.find(x => x.identifierType === 'ERC725').identifierValue) { + hasConnection2 = true; + } + }); + } + } + } catch (e) { + // console.log(e); + } finally { + accept(); + } + }))); + + if (!hasConnection1 || !hasConnection2) { + this.logger.warn(`Invalid connectors (${identifierValue}).`); + return; + } + + await this.graphStorage.addEdge({ + _key: Utilities.keyFrom(dataCreator, vertex._key, relatedVertex._key), + _from: vertex._key, + _to: relatedVertex._key, + relationType: 'CONNECTION_DOWNSTREAM', + edgeType: 'ConnectorRelation', + }); + + await this.graphStorage.addEdge({ + _key: Utilities.keyFrom(dataCreator, relatedVertex._key, vertex._key), + _from: relatedVertex._key, + _to: vertex._key, + relationType: 'CONNECTION_DOWNSTREAM', + edgeType: 'ConnectorRelation', + }); + }, + ); }); + + await this.graphStorage.addDatasetMetadata(metadata); } catch (error) { await this.commandExecutor.add({ name: 'dcFinalizeImportCommand', delay: 0, transactional: false, data: { + handler_id, + documentPath, error: { message: error.message }, }, }); @@ -51,96 +138,6 @@ class DcWriteImportToGraphDbCommand extends Command { return command; } - async writeToDb(data) { - const { - vertices, edges, metadata, dataCreator, - } = data.data; - - await forEachSeries(vertices, vertex => this.graphStorage.addVertex(vertex)); - await forEachSeries(edges, edge => this.graphStorage.addEdge(edge)); - - this.vertices = vertices; - this.edges = edges; - - await forEachSeries(vertices.filter(vertex => vertex.vertexType === 'Connector'), async (vertex) => { - const { identifierValue } = this.vertices.find(v => this.edges.filter(edge => edge._from === vertex._key && ['IDENTIFIED_BY'].includes(edge.relationType)).map(edge => edge._to).includes(v._key)); - const { data } = this.vertices.find(v => this.edges.filter(edge => edge._from === vertex._key && ['HAS_DATA'].includes(edge.relationType)).map(edge => edge._to).includes(v._key)); - // Connect to other connectors if available. - - const connectorIdentifierVertexKey = Utilities.keyFrom('id', identifierValue); - const relatedConnectors = - await this.graphStorage.findConnectors(connectorIdentifierVertexKey); - - await forEachSeries( - relatedConnectors.filter(v => v._key !== vertex._key), - async (relatedVertex) => { - let hasConnection1 = false; - if (relatedVertex.expectedConnectionCreators != null) { - relatedVertex.expectedConnectionCreators.forEach((expectedCreator) => { - const expectedErc725 = this._value(expectedCreator); - - if (dataCreator === expectedErc725) { - hasConnection1 = true; - } - }); - } - - let hasConnection2 = false; - await Promise.all(relatedVertex.datasets - .map(datasetId => new Promise(async (accept, reject) => { - try { - if (hasConnection2 === false) { - const metadata = await this.graphStorage - .findMetadataByImportId(datasetId); - - if (data.expectedConnectionCreators != null) { - data.expectedConnectionCreators - .forEach((expectedCreator) => { - const expectedErc725 = this._value(expectedCreator); - - if (metadata && expectedErc725 === - metadata.datasetHeader.dataCreator.identifiers - .find(x => x.identifierType === 'ERC725').identifierValue) { - hasConnection2 = true; - } - }); - } - } - } catch (e) { - // console.log(e); - } finally { - accept(); - } - }))); - - if (!hasConnection1 || !hasConnection2) { - this.logger.warn(`Invalid connectors (${identifierValue}).`); - return; - } - - await this.graphStorage.addEdge({ - _key: Utilities.keyFrom(dataCreator, vertex._key, relatedVertex._key), - _from: vertex._key, - _to: relatedVertex._key, - relationType: 'CONNECTION_DOWNSTREAM', - edgeType: 'ConnectorRelation', - }); - - await this.graphStorage.addEdge({ - _key: Utilities.keyFrom(dataCreator, relatedVertex._key, vertex._key), - _from: relatedVertex._key, - _to: vertex._key, - relationType: 'CONNECTION_DOWNSTREAM', - edgeType: 'ConnectorRelation', - }); - }, - ); - }); - - await this.graphStorage.addDatasetMetadata(metadata); - } - - /** * Returns value of '@value' property. * @param jsonLdObject JSON-LD object. diff --git a/modules/command/dh/dh-offer-handle-command.js b/modules/command/dh/dh-offer-handle-command.js index 9d090d9a42..79a164711a 100644 --- a/modules/command/dh/dh-offer-handle-command.js +++ b/modules/command/dh/dh-offer-handle-command.js @@ -1,5 +1,8 @@ +const path = require('path'); +const fs = require('fs'); const Command = require('../command'); const Models = require('../../../models/index'); +const Utilities = require('../../Utilities'); /** * Handles new offer from the DH side @@ -57,9 +60,18 @@ class DHOfferHandleCommand extends Command { this.logger.notify(`Replication request for ${offerId} sent to ${dcNodeId}. Response received.`); + const cacheDirectory = path.join(this.config.appDataPath, 'import_cache'); + + await Utilities.writeContentsToFile( + cacheDirectory, + offerId, + JSON.stringify(response.otJson), + ); + const packedResponse = DHOfferHandleCommand._stripResponse(response); Object.assign(packedResponse, { dcNodeId, + documentPath: path.join(cacheDirectory, offerId), }); return { commands: [ @@ -81,7 +93,6 @@ class DHOfferHandleCommand extends Command { return { offerId: response.offer_id, dataSetId: response.data_set_id, - otJson: response.otJson, dcWallet: response.dc_wallet, dcNodeId: response.dcNodeId, litigationPublicKey: response.litigation_public_key, diff --git a/modules/command/dh/dh-replication-import-command.js b/modules/command/dh/dh-replication-import-command.js index 97460d9192..793c1f394a 100644 --- a/modules/command/dh/dh-replication-import-command.js +++ b/modules/command/dh/dh-replication-import-command.js @@ -1,8 +1,7 @@ -const BN = require('../../../node_modules/bn.js/lib/bn'); const bytes = require('utf8-length'); +const fs = require('fs'); const { sha3_256 } = require('js-sha3'); const Command = require('../command'); -const MerkleTree = require('../../Merkle'); const Encryption = require('../../Encryption'); const Utilities = require('../../Utilities'); const Models = require('../../../models/index'); @@ -33,7 +32,7 @@ class DhReplicationImportCommand extends Command { const { offerId, dataSetId, - otJson, + documentPath, dcWallet, dcNodeId, litigationPublicKey, @@ -44,6 +43,8 @@ class DhReplicationImportCommand extends Command { transactionHash, encColor, } = command.data; + const otJson = JSON.parse(fs.readFileSync(documentPath, { encoding: 'utf-8' })); + const { decryptedDataset, encryptedMap } = await ImportUtilities.decryptDataset(otJson, litigationPublicKey, offerId, encColor); const calculatedDataSetId = @@ -113,6 +114,8 @@ class DhReplicationImportCommand extends Command { encryptedMap, }); + fs.unlinkSync(documentPath); + if (importResult.error) { throw Error(importResult.error); } diff --git a/modules/command/dv/dv-data-read-response-free-command.js b/modules/command/dv/dv-data-read-response-free-command.js index 3403bd3d04..f9eabc20e0 100644 --- a/modules/command/dv/dv-data-read-response-free-command.js +++ b/modules/command/dv/dv-data-read-response-free-command.js @@ -1,4 +1,6 @@ const bytes = require('utf8-length'); +const fs = require('fs'); +const path = require('path'); const Models = require('../../../models/index'); const Command = require('../command'); @@ -98,9 +100,18 @@ class DVDataReadResponseFreeCommand extends Command { } try { + const cacheDirectory = path.join(this.config.appDataPath, 'import_cache'); + + await Utilities.writeContentsToFile( + cacheDirectory, + handler_id, + JSON.stringify(document), + ); + const commandData = { - document, + documentPath: path.join(cacheDirectory, handler_id), handler_id, + data_provider_wallet: dcWallet, purchased: true, }; diff --git a/modules/constants.js b/modules/constants.js index 7777c43dca..917126f26f 100644 --- a/modules/constants.js +++ b/modules/constants.js @@ -1,7 +1,7 @@ /** * @constant {number} DEFAULT_CHALLENGE_NUMBER_OF_TESTS - Number of challenges per DH */ -exports.DEFAULT_CHALLENGE_NUMBER_OF_TESTS = 10; +exports.DEFAULT_CHALLENGE_NUMBER_OF_TESTS = 2; /** * @constant {number} DEFAULT_CHALLENGE_BLOCK_SIZE_BYTES - Block size in bytes used for Merkle tree @@ -29,7 +29,7 @@ exports.DEFAULT_COMMAND_CLEANUP_TIME_MILLS = 4 * 24 * 60 * 60 * 1000; */ exports.PERMANENT_COMMANDS = [ 'cleanerCommand', 'dcChallengesCommand', 'dhLitigationInitiatedCommand', - 'dhReplacementStartedCommand', 'reputationUpdateCommand', 'autoupdaterCommand']; + 'reputationUpdateCommand', 'autoupdaterCommand']; /** * @constant {number} MAX_COMMAND_DELAY_IN_MILLS - Maximum delay for commands diff --git a/modules/service/import-service.js b/modules/service/import-service.js index 0579883bdb..ef801582f3 100644 --- a/modules/service/import-service.js +++ b/modules/service/import-service.js @@ -1,3 +1,5 @@ +const path = require('path'); +const fs = require('fs'); const ImportUtilities = require('../ImportUtilities'); const Utilities = require('../Utilities'); const { sha3_256 } = require('js-sha3'); @@ -75,6 +77,7 @@ class ImportService { this.schemaValidator = ctx.schemaValidator; this.web3 = ctx.web3; this.log = ctx.logger; + this.config = ctx.config; } async getImport(datasetId, encColor = null) { @@ -123,13 +126,33 @@ class ImportService { const datasetId = _id(document); const header = document.datasetHeader; - const dataCreator = document.datasetHeader.dataCreator.identifiers[0].identifierValue; + const dataCreator = ImportUtilities.getDataCreator(header); // Result const vertices = []; const edges = []; const objectIds = []; document['@graph'].forEach((otObject) => { + if (!_id(otObject) && _id(otObject) !== '') { + throw Error('OT-JSON object missing @id parameter'); + } + + if (!_type(otObject) && _type(otObject) !== '') { + throw Error(`OT-JSON object ${_id(otObject)} missing @type parameter`); + } + + if (!otObject.identifiers) { + throw Error(`OT-JSON object ${_id(otObject)} missing identifiers parameter`); + } + + if (!otObject.properties) { + throw Error(`OT-JSON object ${_id(otObject)} missing properties parameter`); + } + + if (!otObject.relations) { + throw Error(`OT-JSON object ${_id(otObject)} missing relations parameter`); + } + objectIds.push(Utilities.keyFrom(dataCreator, _id(otObject))); switch (_type(otObject)) { @@ -573,6 +596,7 @@ class ImportService { const otObjects = []; for (let i = 0; i < reconstructedObjects.length; i += 1) { + ImportUtilities.sortGraphRecursively([reconstructedObjects[i]]); if (reconstructedObjects[i] && reconstructedObjects[i]['@id']) { otObjects.push({ otObject: reconstructedObjects[i], diff --git a/modules/service/replication-service.js b/modules/service/replication-service.js index d9ca227082..d80b783b13 100644 --- a/modules/service/replication-service.js +++ b/modules/service/replication-service.js @@ -1,9 +1,9 @@ const BN = require('bn.js'); const path = require('path'); +const fs = require('fs'); const Encryption = require('../Encryption'); const ImportUtilities = require('../ImportUtilities'); -const MerkleTree = require('../Merkle'); const Models = require('../../models/index'); const Utilities = require('../Utilities'); @@ -23,8 +23,13 @@ class ReplicationService { this.config = ctx.config; this.graphStorage = ctx.graphStorage; this.challengeService = ctx.challengeService; - this.replicationCache = {}; this.importService = ctx.importService; + + const replicationPath = path.join(this.config.appDataPath, 'replication_cache'); + + if (!fs.existsSync(replicationPath)) { + fs.mkdirSync(replicationPath); + } } /** @@ -39,54 +44,56 @@ class ReplicationService { } const otJson = await this.importService.getImport(offer.data_set_id); - const flavor = { - [COLOR.RED]: otJson, - [COLOR.BLUE]: Utilities.copyObject(otJson), - [COLOR.GREEN]: Utilities.copyObject(otJson), - }; - - const that = this; - this.replicationCache[internalOfferId] = {}; - return Promise.all([COLOR.RED, COLOR.BLUE, COLOR.GREEN] - .map(async (color) => { - const document = flavor[color]; - - - const litigationKeyPair = Encryption.generateKeyPair(512); - const distributionKeyPair = Encryption.generateKeyPair(512); - - const encryptedDataset = - ImportUtilities.encryptDataset(document, litigationKeyPair.privateKey); - - const distEncDataset = - ImportUtilities.encryptDataset(document, distributionKeyPair.privateKey); - - const litRootHash = this.challengeService.getLitigationRootHash(encryptedDataset['@graph']); - - const distRootHash = ImportUtilities.calculateDatasetRootHash(distEncDataset['@graph'], distEncDataset['@id'], distEncDataset.datasetHeader.dataCreator); - - const distEpk = Encryption.packEPK(distributionKeyPair.publicKey); - // const litigationEpk = Encryption.packEPK(distributionKeyPair.publicKey); - // TODO Why are there zeroes here - const distributionEpkChecksum = - Encryption.calculateDataChecksum(distEpk, 0, 0, 0); - - const replication = { - color, - otJson: encryptedDataset, - litigationPublicKey: litigationKeyPair.publicKey, - litigationPrivateKey: litigationKeyPair.privateKey, - distributionPublicKey: distributionKeyPair.publicKey, - distributionPrivateKey: distributionKeyPair.privateKey, - distributionEpkChecksum, - litigationRootHash: litRootHash, - distributionRootHash: distRootHash, - distributionEpk: distEpk, - }; - - that.replicationCache[internalOfferId][color] = replication; - return replication; - })); + + const hashes = {}; + + const writeFilePromises = []; + + for (let i = 0; i < 3; i += 1) { + const color = this.castNumberToColor(i); + + const litigationKeyPair = Encryption.generateKeyPair(512); + const distributionKeyPair = Encryption.generateKeyPair(512); + + // TODO Optimize encryption to reduce memory usage + let encryptedDataset = + ImportUtilities.encryptDataset(otJson, distributionKeyPair.privateKey); + + const distRootHash = ImportUtilities.calculateDatasetRootHash(encryptedDataset['@graph'], encryptedDataset['@id'], encryptedDataset.datasetHeader.dataCreator); + + encryptedDataset = + ImportUtilities.encryptDataset(otJson, litigationKeyPair.privateKey); + + const litRootHash = this.challengeService.getLitigationRootHash(encryptedDataset['@graph']); + + const distEpk = Encryption.packEPK(distributionKeyPair.publicKey); + // const litigationEpk = Encryption.packEPK(distributionKeyPair.publicKey); + // TODO Why are there zeroes here + const distributionEpkChecksum = + Encryption.calculateDataChecksum(distEpk, 0, 0, 0); + + const replication = { + color, + otJson: encryptedDataset, + litigationPublicKey: litigationKeyPair.publicKey, + litigationPrivateKey: litigationKeyPair.privateKey, + distributionPublicKey: distributionKeyPair.publicKey, + distributionPrivateKey: distributionKeyPair.privateKey, + distributionEpkChecksum, + litigationRootHash: litRootHash, + distributionRootHash: distRootHash, + distributionEpk: distEpk, + }; + + writeFilePromises.push(this.saveReplication(internalOfferId, color, replication)); + + hashes[`${color}LitigationHash`] = litRootHash; + hashes[`${color}DistributionHash`] = distRootHash; + } + + await Promise.all(writeFilePromises); + + return hashes; } /** @@ -133,8 +140,6 @@ class ReplicationService { * @return {Promise} */ async cleanup(internalOfferId) { - delete this.replicationCache[internalOfferId]; - this.logger.info(`Deleting replications directory and cache for offer with internal ID ${internalOfferId}`); const offerDirPath = this._getOfferDirPath(internalOfferId); await Utilities.deleteDirectory(offerDirPath); @@ -147,10 +152,8 @@ class ReplicationService { * @param internalOfferId */ async saveReplication(internalOfferId, color, data) { - this.replicationCache[internalOfferId][color] = data; - const offerDirPath = this._getOfferDirPath(internalOfferId); - await Utilities.writeContentsToFile(offerDirPath, `${color}.json`, JSON.stringify(data, null, 2)); + await Utilities.writeContentsToFile(offerDirPath, `${color}.json`, JSON.stringify(data)); } /** @@ -160,20 +163,13 @@ class ReplicationService { * @return {Promise<*>} */ async loadReplication(internalOfferId, color) { - let data; - if (this.replicationCache[internalOfferId]) { - data = this.replicationCache[internalOfferId][color]; - } - - if (data) { - this.logger.trace(`Loaded replication from cache for offer internal ID ${internalOfferId} and color ${color}`); - return data; - } - const offerDirPath = this._getOfferDirPath(internalOfferId); const colorFilePath = path.join(offerDirPath, `${color}.json`); + const data = JSON.parse(await Utilities.fileContents(colorFilePath)); this.logger.trace(`Loaded replication from file for offer internal ID ${internalOfferId} and color ${color}`); + + return data; } /** diff --git a/modules/service/rest-api-v2.js b/modules/service/rest-api-v2.js index d728317aad..871e532632 100644 --- a/modules/service/rest-api-v2.js +++ b/modules/service/rest-api-v2.js @@ -1,7 +1,9 @@ +const path = require('path'); +const fs = require('fs'); const pjson = require('../../package.json'); const RestAPIValidator = require('../validator/rest-api-validator'); const ImportUtilities = require('../ImportUtilities'); -const utilities = require('../Utilities'); +const Utilities = require('../Utilities'); const Models = require('../../models'); class RestAPIServiceV2 { @@ -116,6 +118,11 @@ class RestAPIServiceV2 { await this._checkForHandlerStatus(req, res); }); + server.post(`/api/${this.version_id}/challenges`, async (req, res) => { + await this._getChallenges(req, res); + }); + + /** Network related routes */ server.get(`/api/${this.version_id}/network/get-contact/:node_id`, async (req, res) => { const nodeId = req.params.node_id; @@ -268,7 +275,7 @@ class RestAPIServiceV2 { const humanReadable = req.query.humanReadable === 'true'; const walletEthBalance = await web3.eth.getBalance(config.node_wallet); - const walletTokenBalance = await utilities.getTracTokenBalance( + const walletTokenBalance = await Utilities.getTracTokenBalance( web3, config.node_wallet, blockchain.getTokenContractAddress(), @@ -377,8 +384,8 @@ class RestAPIServiceV2 { const { identifier_types, identifier_values } = req.body; - if (utilities.arrayze(identifier_types).length !== - utilities.arrayze(identifier_values).length) { + if (Utilities.arrayze(identifier_types).length !== + Utilities.arrayze(identifier_values).length) { res.status(400); res.send({ message: 'Identifier array length mismatch', @@ -394,13 +401,13 @@ class RestAPIServiceV2 { const keys = []; - const typesArray = utilities.arrayze(identifier_types); - const valuesArray = utilities.arrayze(identifier_values); + const typesArray = Utilities.arrayze(identifier_types); + const valuesArray = Utilities.arrayze(identifier_values); const { length } = typesArray; for (let i = 0; i < length; i += 1) { - keys.push(utilities.keyFrom(typesArray[i], valuesArray[i])); + keys.push(Utilities.keyFrom(typesArray[i], valuesArray[i])); } try { @@ -444,7 +451,7 @@ class RestAPIServiceV2 { const { object_ids, dataset_id } = req.body; const response = - await this.importService.getMerkleProofs(utilities.arrayze(object_ids), dataset_id); + await this.importService.getMerkleProofs(Utilities.arrayze(object_ids), dataset_id); res.status(200); res.send(response); @@ -601,6 +608,55 @@ class RestAPIServiceV2 { }); } + async _getChallenges(req, res) { + if (req.body === undefined) { + res.status(400); + res.send({ + message: 'Bad request', + }); + return; + } + + // Check if import type is valid + if (req.body.startDate === undefined || + req.body.endDate === undefined) { + res.status(400); + res.send({ + message: 'Bad request startDate and endDate required!', + }); + return; + } + + const challenges = await Models.challenges.findAll({ + where: { + start_time: { + [Models.Sequelize.Op.between]: + [(new Date(req.body.startDate)).getTime(), + (new Date(req.body.endDate)).getTime()], + }, + status: { + [Models.Sequelize.Op.not]: 'PENDING', + }, + }, + order: [ + ['start_time', 'ASC'], + ], + }); + const returnChallenges = []; + challenges.forEach((challenge) => { + const answered = !!challenge.answer; + returnChallenges.push({ + offer_id: challenge.offer_id, + start_time: challenge.start_time, + status: challenge.status, + answered, + }); + }); + + res.status(200); + res.send(returnChallenges); + } + // This is hardcoded import in case it is needed to make new importer with this method async _importDataset(req, res) { this.logger.api('POST: Import of data request received.'); @@ -629,7 +685,7 @@ class RestAPIServiceV2 { let fileContent; if (req.files !== undefined && req.files.file !== undefined) { const inputFile = req.files.file.path; - fileContent = await utilities.fileContents(inputFile); + fileContent = await Utilities.fileContents(inputFile); } else if (req.body.file !== undefined) { fileContent = req.body.file; } @@ -639,9 +695,32 @@ class RestAPIServiceV2 { const inserted_object = await Models.handler_ids.create({ status: 'PENDING', }); + + const cacheDirectory = path.join(this.config.appDataPath, 'import_cache'); + + try { + await Utilities.writeContentsToFile( + cacheDirectory, + inserted_object.dataValues.handler_id, + fileContent, + ); + } catch (e) { + const filePath = + path.join(cacheDirectory, inserted_object.dataValues.handler_id); + + if (fs.existsSync(filePath)) { + await Utilities.deleteDirectory(filePath); + } + res.status(500); + res.send({ + message: `Error when creating import cache file for handler_id ${inserted_object.dataValues.handler_id}. ${e.message}`, + }); + return; + } + const commandData = { standard_id, - document: fileContent, + documentPath: path.join(cacheDirectory, inserted_object.dataValues.handler_id), handler_id: inserted_object.dataValues.handler_id, }; const commandSequence = [ diff --git a/modules/transpiler/epcis/epcis-otjson-transpiler.js b/modules/transpiler/epcis/epcis-otjson-transpiler.js index 20854a11c5..8c1f5b4bf0 100644 --- a/modules/transpiler/epcis/epcis-otjson-transpiler.js +++ b/modules/transpiler/epcis/epcis-otjson-transpiler.js @@ -244,9 +244,13 @@ class EpcisOtJsonTranspiler { } const otVocabulary = { + '@id': vocabularyElement._attributes.id, + '@type': 'otObject', identifiers: [], relations: [], + properties, }; + // TODO Find out what happens when there is no _attribute.id if (vocabularyElement._attributes.id) { otVocabulary.identifiers = Object.entries(this._parseGS1Identifier(vocabularyElement._attributes.id)) @@ -255,10 +259,6 @@ class EpcisOtJsonTranspiler { otVocabulary.identifiers.push(...this._findIdentifiers(vocabularyElement)); - otVocabulary['@id'] = vocabularyElement._attributes.id; - otVocabulary.properties = properties; - otVocabulary['@type'] = 'otObject'; - if (vocabularyElement.children) { const compressedChildren = this._compressText(vocabularyElement.children); otVocabulary.properties.children = utilities.arrayze(compressedChildren.id); @@ -418,16 +418,16 @@ class EpcisOtJsonTranspiler { const otObject = { '@type': 'otObject', '@id': id, - identifiers: [{ - '@type': 'uuid', - '@value': id, - }, + identifiers: [ + { + '@type': 'uuid', + '@value': id, + }, ], - }; - - otObject.relations = []; - otObject.properties = { - objectType: eventType, + relations: [], + properties: { + objectType: eventType, + }, }; const foundIdentifiers = this._findIdentifiers(event); diff --git a/modules/transpiler/wot/wot-otjson-transpiler.js b/modules/transpiler/wot/wot-otjson-transpiler.js index 92b13f2628..30c1190851 100644 --- a/modules/transpiler/wot/wot-otjson-transpiler.js +++ b/modules/transpiler/wot/wot-otjson-transpiler.js @@ -93,20 +93,23 @@ class WotOtJsonTranspiler { const otObject = { '@type': 'otObject', '@id': id, - identifiers: [{ - '@type': 'id', - '@value': id, - }, { - '@type': 'internal_id', - '@value': property.id, - }, { - '@type': 'name', - '@value': property.name, - }, + identifiers: [ + { + '@type': 'id', + '@value': id, + }, + { + '@type': 'internal_id', + '@value': property.id, + }, + { + '@type': 'name', + '@value': property.name, + }, ], + properties: property.values, + relations: [], }; - otObject.properties = property.values; - otObject.relations = []; result.push(otObject); } @@ -171,19 +174,19 @@ class WotOtJsonTranspiler { const otObject = { '@type': 'otObject', '@id': id, - identifiers: [{ - '@type': 'id', - '@value': id, - }, + identifiers: [ + { + '@type': 'id', + '@value': id, + }, ], - }; - - otObject.relations = []; - otObject.properties = { - id: thing.id, - name: thing.name, - description: thing.description, - tags: thing.tags, + relations: [], + properties: { + id: thing.id, + name: thing.name, + description: thing.description, + tags: thing.tags, + }, }; const createRelation = (id, relType, data) => ({ @@ -243,17 +246,17 @@ class WotOtJsonTranspiler { const otObject = { '@type': 'otObject', '@id': obj.id, - identifiers: [{ - '@type': 'id', - '@value': obj.id, - }, + identifiers: [ + { + '@type': 'id', + '@value': obj.id, + }, ], + relations: [], + properties: { + '@type': obj.type, + }, }; - otObject.properties = { - '@type': obj.type, - }; - - otObject.relations = []; results.push(otObject); } diff --git a/modules/worker/graph-converter-worker.js b/modules/worker/graph-converter-worker.js index 310ae0632b..3e68d5b75f 100644 --- a/modules/worker/graph-converter-worker.js +++ b/modules/worker/graph-converter-worker.js @@ -1,5 +1,6 @@ const { sha3_256 } = require('js-sha3'); const Utilities = require('../Utilities'); +const ImportUtilities = require('../ImportUtilities'); /** * Returns value of '@id' property. @@ -73,19 +74,39 @@ Object.freeze(constants); process.on('message', async (dataFromParent) => { const { - document, encryptedMap, wallet, handler_id, + document, encryptedMap, } = JSON.parse(dataFromParent); try { const datasetId = _id(document); - const header = document.datasetHeader; - const dataCreator = document.datasetHeader.dataCreator.identifiers[0].identifierValue; + const dataCreator = ImportUtilities.getDataCreator(document.datasetHeader); // Result const vertices = []; const edges = []; document['@graph'].forEach((otObject) => { + if (!_id(otObject) && _id(otObject) !== '') { + throw Error('OT-JSON object missing @id parameter'); + } + + if (!_type(otObject) && _type(otObject) !== '') { + throw Error(`OT-JSON object ${_id(otObject)} missing @type parameter`); + } + + if (!otObject.identifiers) { + throw Error(`OT-JSON object ${_id(otObject)} missing identifiers parameter`); + } + + if (!otObject.properties) { + throw Error(`OT-JSON object ${_id(otObject)} missing properties parameter`); + } + + if (!otObject.relations) { + throw Error(`OT-JSON object ${_id(otObject)} missing relations parameter`); + } + + switch (_type(otObject)) { case constants.objectType.otObject: { // Create entity vertex. @@ -353,13 +374,13 @@ process.on('message', async (dataFromParent) => { // datasetContext: _context(data), datasetHeader: document.datasetHeader, signature: document.signature, - vertices: vertices.reduce((acc, current) => { + vertices: deduplicateVertices.reduce((acc, current) => { if (!acc.includes(current._key)) { acc.push(current._key); } return acc; }, []), - edges: edges.reduce((acc, current) => { + edges: deduplicateEdges.reduce((acc, current) => { if (!acc.includes(current._key)) { acc.push(current._key); } @@ -370,25 +391,28 @@ process.on('message', async (dataFromParent) => { const total_documents = document['@graph'].length; const root_hash = document.datasetHeader.dataIntegrity.proofs[0].proofValue; + const graphObject = {}; + Object.assign(graphObject, ImportUtilities.unpackKeysAndSortVertices({ + vertices: deduplicateVertices, + edges: deduplicateEdges, + })); + const data_hash = Utilities.normalizeHex(sha3_256(`${graphObject}`)); + const response = { - vertices, - edges, + vertices: deduplicateVertices, + edges: deduplicateEdges, metadata, datasetId, - header, - dataCreator, - wallet, total_documents, root_hash, - deduplicateEdges, - deduplicateVertices, - handler_id, + data_hash, }; + process.send(JSON.stringify(response), () => { process.exit(0); }); } catch (e) { - process.send({ error: e.message }); + process.send({ error: `${e.message}\n${e.stack}` }); } }); diff --git a/modules/worker/import-worker-controller.js b/modules/worker/import-worker-controller.js index 11aca1ddb0..658ed1609c 100644 --- a/modules/worker/import-worker-controller.js +++ b/modules/worker/import-worker-controller.js @@ -1,3 +1,4 @@ +const fs = require('fs'); const { fork } = require('child_process'); const ImportUtilities = require('../ImportUtilities'); const bytes = require('utf8-length'); @@ -22,12 +23,16 @@ class ImportWorkerController { async startGraphConverterWorker(command) { this.logger.info('Starting graph converter worker'); const { - document, + documentPath, handler_id, encryptedMap, + data_provider_wallet, purchased, } = command.data; + let document = fs.readFileSync(documentPath, { encoding: 'utf-8' }); + const otjson_size_in_bytes = bytes(document); + document = JSON.parse(document); // Extract wallet from signature. const wallet = ImportUtilities.extractDatasetSigner( document, @@ -36,8 +41,6 @@ class ImportWorkerController { await this.importService.validateDocument(document); - const otjson_size_in_bytes = bytes(JSON.stringify(document)); - const forked = fork('modules/worker/graph-converter-worker.js'); forked.send(JSON.stringify({ @@ -46,31 +49,28 @@ class ImportWorkerController { forked.on('message', async (response) => { if (response.error) { - await this._sendErrorToFinalizeCommand(response.error, handler_id); + await this._sendErrorToFinalizeCommand(response.error, handler_id, documentPath); forked.kill(); return; } const parsedData = JSON.parse(response); + + fs.writeFileSync(documentPath, JSON.stringify({ + vertices: parsedData.vertices, + edges: parsedData.edges, + metadata: parsedData.metadata, + })); + const commandData = { - dbData: { - vertices: parsedData.vertices, - edges: parsedData.edges, - metadata: parsedData.metadata, - datasetId: parsedData.datasetId, - header: parsedData.header, - dataCreator: parsedData.dataCreator, - }, - afterImportData: { - wallet: parsedData.wallet, - total_documents: parsedData.total_documents, - root_hash: parsedData.root_hash, - vertices: parsedData.deduplicateEdges, - edges: parsedData.deduplicateVertices, - data_set_id: parsedData.datasetId, - handler_id: parsedData.handler_id, - otjson_size_in_bytes, - purchased, - }, + handler_id, + documentPath, + data_set_id: parsedData.datasetId, + root_hash: parsedData.root_hash, + data_hash: parsedData.data_hash, + total_documents: parsedData.total_documents, + otjson_size_in_bytes, + data_provider_wallet, + purchased, }; await this.commandExecutor.add({ @@ -86,36 +86,36 @@ class ImportWorkerController { async startOtjsonConverterWorker(command, standardId) { this.logger.info('Starting ot-json converter worker'); - const { document, handler_id } = command.data; - + const { documentPath, handler_id } = command.data; + const document = fs.readFileSync(documentPath, { encoding: 'utf-8' }); const forked = fork('modules/worker/otjson-converter-worker.js'); forked.send(JSON.stringify({ config: this.config, dataset: document, standardId })); forked.on('message', async (response) => { if (response.error) { - await this._sendErrorToFinalizeCommand(response.error, handler_id); - forked.kill(); - return; + await this._sendErrorToFinalizeCommand(response.error, handler_id, documentPath); + } else { + const otjson = JSON.parse(response); + const signedOtjson = ImportUtilities.signDataset(otjson, this.config, this.web3); + fs.writeFileSync(documentPath, JSON.stringify(signedOtjson)); + const commandData = { + documentPath, + handler_id, + }; + await this.commandExecutor.add({ + name: command.sequence[0], + sequence: command.sequence.slice(1), + delay: 0, + data: commandData, + transactional: false, + }); } - const otjson = JSON.parse(response); - const signedOtjson = ImportUtilities.signDataset(otjson, this.config, this.web3); - const commandData = { - document: signedOtjson, - handler_id, - }; - await this.commandExecutor.add({ - name: command.sequence[0], - sequence: command.sequence.slice(1), - delay: 0, - data: commandData, - transactional: false, - }); forked.kill(); }); } - async _sendErrorToFinalizeCommand(error, handler_id) { + async _sendErrorToFinalizeCommand(error, handler_id, documentPath) { await this.commandExecutor.add({ name: 'dcFinalizeImportCommand', delay: 0, @@ -123,6 +123,7 @@ class ImportWorkerController { data: { error: { message: error }, handler_id, + documentPath, }, }); } diff --git a/modules/worker/otjson-converter-worker.js b/modules/worker/otjson-converter-worker.js index a83944a2d4..46792fae1c 100644 --- a/modules/worker/otjson-converter-worker.js +++ b/modules/worker/otjson-converter-worker.js @@ -3,24 +3,25 @@ const WotOtJsonTranspiler = require('.././transpiler/wot/wot-otjson-transpiler') process.on('message', (data) => { try { - data = JSON.parse(data); + const { standardId, config, dataset } = JSON.parse(data); let transpiler; - switch (data.standardId) { + switch (standardId) { case 'gs1': { - transpiler = new EpcisOtJsonTranspiler({ config: data.config }); + transpiler = new EpcisOtJsonTranspiler({ config }); break; } case 'wot': { - transpiler = new WotOtJsonTranspiler({ config: data.config }); + transpiler = new WotOtJsonTranspiler({ config }); break; } default: - process.send({ error: `Unsupported standardId: ${data.standardId}` }); + process.send({ error: `Unsupported standardId: ${standardId}` }); return; } - const stringifiedJson = transpiler.convertToOTJson(data.dataset); + + const stringifiedJson = transpiler.convertToOTJson(dataset); process.send(stringifiedJson); } catch (e) { - process.send({ error: e.message }); + process.send({ error: `${e.message}\n${e.stack}` }); } }); diff --git a/package-lock.json b/package-lock.json index 3a8cac0ed3..47973849bd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "4.0.0", + "version": "4.0.3", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 8414ea00b7..4aa054ab32 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "4.0.2", + "version": "4.0.3", "description": "OriginTrail node", "main": ".eslintrc.js", "config": { diff --git a/test/bdd/features/litigation.feature b/test/bdd/features/litigation.feature index 1b4e0c752e..18852798f4 100644 --- a/test/bdd/features/litigation.feature +++ b/test/bdd/features/litigation.feature @@ -3,42 +3,6 @@ Feature: Test various litigation scenarios Given the blockchain is set up And 1 bootstrap is running - @first - Scenario: Test litigation for one holder which is not responding - Given the replication difficulty is 0 - And I setup 4 nodes - And I override configuration for all nodes - | dc_holding_time_in_minutes | 5 | - | numberOfChallenges | 100 | - | challengeResponseTimeMills | 5000 | - And I start the nodes - And I use 1st node as DC - And DC imports "importers/xml_examples/Retail/01_Green_to_pink_shipment.xml" as GS1-EPCIS - And DC waits for import to finish - Then DC's last import's hash should be the same as one manually calculated - Given DC initiates the replication for last imported dataset - And DC waits for last offer to get written to blockchain - And I wait for replications to finish - Then the last root hash should be the same as one manually calculated - Then the last import should be the same on all nodes that replicated data - And I wait for challenges to start - And I corrupt 1st holder's database ot_vertices collection - And I wait for litigation initiation - And I corrupt 2nd holder's database ot_vertices collection - Then Litigator should delay other litigations while one is running - Then 1st holder to litigate should answer litigation - Then Litigator node should have completed litigation - Then 1st started holder should have been penalized -# Then Litigator should have started replacement for penalized holder -# Then I wait for 4 replacement replications to finish -# Then I wait for replacement to be completed -# Then 2nd holder to litigate should answer litigation -# Then Litigator node should have completed litigation -# Then 2nd started holder should have been penalized -# Then Litigator should have started replacement for penalized holder -# Then I wait for 3 replacement replications to finish -# Then I wait for replacement to be completed - @second Scenario: Test litigation for one holder which has failed to answer challenge but succeeded to answer litigation (wrongly) Given the replication difficulty is 0 @@ -94,7 +58,7 @@ Feature: Test various litigation scenarios Then Litigator node should have completed litigation Then 1st started holder should not have been penalized - @fourth + @skip #to be done when we finish replacement Scenario: Test litigation case where same new nodes will apply for same offer Given the replication difficulty is 0 And I setup 4 nodes diff --git a/test/bdd/steps/datalayer.js b/test/bdd/steps/datalayer.js index f2d171b86e..05dfcde6ea 100644 --- a/test/bdd/steps/datalayer.js +++ b/test/bdd/steps/datalayer.js @@ -100,7 +100,7 @@ Then(/^(DC|DH)'s (\d+) dataset hashes should match blockchain values$/, async fu const dataCreator = { identifiers: [ { - identifierValue: dataset.datasetHeader.dataCreator.identifiers[0].identifierValue, + identifierValue: ImportUtilities.getDataCreator(dataset.datasetHeader), identifierType: 'ERC725', validationSchema: '/schemas/erc725-main', },