diff --git a/modules/EventEmitter.js b/modules/EventEmitter.js index ef8bc2f197..425b549caf 100644 --- a/modules/EventEmitter.js +++ b/modules/EventEmitter.js @@ -100,6 +100,7 @@ class EventEmitter { dcService, dvController, commandExecutor, + dhController, } = this.ctx; this._on('api-trail', (data) => { @@ -435,6 +436,7 @@ class EventEmitter { dcService, dvController, dcController, + dhController, networkService, } = this.ctx; @@ -512,11 +514,14 @@ class EventEmitter { } } - const { offerId, wallet, dhIdentity } = replicationMessage; + const { + offerId, wallet, dhIdentity, + async_enabled, + } = replicationMessage; const identity = transport.extractSenderID(request); try { await dcService.handleReplicationRequest( - offerId, wallet, identity, dhIdentity, + offerId, wallet, identity, dhIdentity, async_enabled, response, ); } catch (error) { @@ -533,11 +538,49 @@ class EventEmitter { } }); + this._on('kad-replication-data', async (request, response) => { + const kadReplicationRequest = transport.extractMessage(request); + let replicationMessage = kadReplicationRequest; + + if (kadReplicationRequest.messageSignature) { + const { message, messageSignature } = kadReplicationRequest; + replicationMessage = message; + + if (!Utilities.isMessageSigned(this.web3, message, messageSignature)) { + logger.warn(`We have a forger here. Signature doesn't match for message: ${JSON.stringify(message)}`); + return; + } + } + + const senderIdentity = transport.extractSenderID(request); + try { + await dhController.handleReplicationData( + senderIdentity, + replicationMessage, + response, + ); + } catch (error) { + const errorMessage = `Failed to handle replication data. ${error}.`; + logger.warn(errorMessage); + + try { + await transport.sendResponse(response, { + status: 'fail', + }); + } catch (e) { + logger.error(`Failed to send response 'fail' status. Error: ${e}.`); // TODO handle this case + } + } + }); + // sync this._on('kad-replacement-replication-request', async (request, response) => { try { const message = transport.extractMessage(request); - const { offerId, wallet, dhIdentity } = message; + const { + offerId, wallet, dhIdentity, + async_enabled, + } = message; const { wallet: senderWallet } = transport.extractSenderInfo(request); const identity = transport.extractSenderID(request); @@ -546,7 +589,7 @@ class EventEmitter { } await dcService.handleReplacementRequest( - offerId, wallet, identity, dhIdentity, + offerId, wallet, identity, dhIdentity, async_enabled, response, ); } catch (error) { diff --git a/modules/command/dc/dc-replication-send-command.js b/modules/command/dc/dc-replication-send-command.js new file mode 100644 index 0000000000..f9ed86fbb3 --- /dev/null +++ b/modules/command/dc/dc-replication-send-command.js @@ -0,0 +1,147 @@ +const BN = require('bn.js'); +const Command = require('../command'); +const Utilities = require('../../Utilities'); +const Encryption = require('../../RSAEncryption'); +const Models = require('../../../models/index'); + +/** + * Handles replication request + */ +class DcReplicationSendCommand extends Command { + constructor(ctx) { + super(ctx); + this.config = ctx.config; + this.logger = ctx.logger; + this.transport = ctx.transport; + this.web3 = ctx.web3; + + this.replicationService = ctx.replicationService; + this.permissionedDataService = ctx.permissionedDataService; + this.importService = ctx.importService; + } + + /** + * Creates an offer in the database + * @param command + * @returns {Promise<{commands}>} + */ + async execute(command) { + const { + internalOfferId, wallet, identity, dhIdentity, offerId, + } = command.data; + + + const usedDH = await Models.replicated_data.findOne({ + where: { + dh_id: identity, + dh_wallet: wallet, + dh_identity: dhIdentity, + offer_id: offerId, + }, + }); + + let colorNumber = Utilities.getRandomInt(2); + if (usedDH != null && usedDH.status === 'STARTED' && usedDH.color) { + colorNumber = usedDH.color; + } + + const color = this.replicationService.castNumberToColor(colorNumber); + + const offer = await Models.offers.findOne({ where: { id: internalOfferId } }); + const replication = await this.replicationService.loadReplication(offer.id, color); + + if (!usedDH) { + await Models.replicated_data.create({ + dh_id: identity, + dh_wallet: wallet.toLowerCase(), + dh_identity: dhIdentity.toLowerCase(), + offer_id: offer.offer_id, + litigation_private_key: replication.litigationPrivateKey, + litigation_public_key: replication.litigationPublicKey, + distribution_public_key: replication.distributionPublicKey, + distribution_private_key: replication.distributionPrivateKey, + distribution_epk_checksum: replication.distributionEpkChecksum, + litigation_root_hash: replication.litigationRootHash, + distribution_root_hash: replication.distributionRootHash, + distribution_epk: replication.distributionEpk, + status: 'STARTED', + color: colorNumber, + }); + } + + const toSign = [ + Utilities.denormalizeHex(new BN(replication.distributionEpkChecksum).toString('hex')), + Utilities.denormalizeHex(replication.distributionRootHash), + ]; + const distributionSignature = Encryption.signMessage( + this.web3, toSign, + Utilities.normalizeHex(this.config.node_private_key), + ); + + const permissionedData = await this.permissionedDataService.getAllowedPermissionedData( + offer.data_set_id, + identity, + ); + + const promises = []; + for (const ot_object_id in permissionedData) { + promises.push(this.importService.getOtObjectById(offer.data_set_id, ot_object_id)); + } + + const ot_objects = await Promise.all(promises); + + await this.permissionedDataService.attachPermissionedDataToMap( + permissionedData, + ot_objects, + ); + + const payload = { + offer_id: offer.offer_id, + data_set_id: offer.data_set_id, + dc_wallet: this.config.node_wallet, + dcIdentity: this.config.erc725Identity, + dcNodeId: this.config.network.identity, + otJson: replication.otJson, + permissionedData, + litigation_public_key: replication.litigationPublicKey, + distribution_public_key: replication.distributionPublicKey, + distribution_private_key: replication.distributionPrivateKey, + distribution_epk_checksum: replication.distributionEpkChecksum, + litigation_root_hash: replication.litigationRootHash, + distribution_root_hash: replication.distributionRootHash, + distribution_epk: replication.distributionEpk, + distribution_signature: distributionSignature.signature, + transaction_hash: offer.transaction_hash, + distributionSignature, + color: colorNumber, + }; + + // send replication to DH + const response = await this.transport.replicationData(payload, identity); + + if (response.status === 'fail') { + this.logger.warn(`Sending replication data for offer ${offer.id} to ${identity} failed. ${response.message}`); + } else { + this.logger.info(`Successfully sent replication data for offer_id ${offer.offer_id} to node ${identity}.`); + } + + return Command.empty(); + } + + /** + * Builds default command + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'dcReplicationSendCommand', + delay: 0, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +module.exports = DcReplicationSendCommand; diff --git a/modules/command/dh/dh-offer-handle-command.js b/modules/command/dh/dh-offer-handle-command.js index 890c22d83e..5fff377b80 100644 --- a/modules/command/dh/dh-offer-handle-command.js +++ b/modules/command/dh/dh-offer-handle-command.js @@ -1,7 +1,7 @@ const path = require('path'); -const fs = require('fs'); + const Command = require('../command'); -const Models = require('../../../models/index'); +const Models = require('../../../models'); const Utilities = require('../../Utilities'); /** @@ -13,7 +13,7 @@ class DHOfferHandleCommand extends Command { this.logger = ctx.logger; this.config = ctx.config; this.transport = ctx.transport; - this.blockchain = ctx.blockchain; + this.commandExecutor = ctx.commandExecutor; } /** @@ -26,7 +26,7 @@ class DHOfferHandleCommand extends Command { dcNodeId, } = command.data; - this.logger.trace(`Sending replication request for offer ${offerId} to ${dcNodeId}.`); + this.logger.trace(`Sending replication request for offer ${offerId} to node ${dcNodeId}.`); const response = await this.transport.replicationRequest({ offerId, wallet: this.config.node_wallet, @@ -58,7 +58,24 @@ class DHOfferHandleCommand extends Command { bid.status = 'SENT'; await bid.save({ fields: ['status'] }); - this.logger.notify(`Replication request for ${offerId} sent to ${dcNodeId}. Response received.`); + if (response.status === 'acknowledge') { + this.logger.notify(`Received replication request acknowledgement for offer_id ${offerId} from node ${dcNodeId}.`); + + return { + commands: [ + { + name: 'dhReplicationTimeoutCommand', + delay: this.config.dc_choose_time, + data: { + offerId, + dcNodeId, + }, + }, + ], + }; + } + + this.logger.notify(`Received replication data for offer_id ${offerId} from node ${dcNodeId}.`); const cacheDirectory = path.join(this.config.appDataPath, 'import_cache'); diff --git a/modules/command/dh/dh-replication-import-command.js b/modules/command/dh/dh-replication-import-command.js index 53e77071fc..92b7ac2820 100644 --- a/modules/command/dh/dh-replication-import-command.js +++ b/modules/command/dh/dh-replication-import-command.js @@ -89,7 +89,6 @@ class DhReplicationImportCommand extends Command { throw Error(`Calculated root hash ${decryptedGraphRootHash} differs from document root hash ${originalRootHash}`); } - // TODO: Verify EPK checksum // TODO: Verify distribution keys and hashes // TODO: Verify data creator id @@ -168,7 +167,7 @@ class DhReplicationImportCommand extends Command { origin: 'HOLDING', }); } - this.logger.important(`[DH] Replication finished for offer ID ${offerId}`); + this.logger.important(`[DH] Replication finished for offer_id ${offerId}`); const toSign = [ Utilities.denormalizeHex(offerId), @@ -184,7 +183,11 @@ class DhReplicationImportCommand extends Command { }; await this.transport.replicationFinished(replicationFinishedMessage, dcNodeId); - this.logger.info(`Replication request for ${offerId} sent to ${dcNodeId}`); + const bid = await Models.bids.findOne({ where: { offer_id: offerId } }); + bid.status = 'REPLICATED'; + await bid.save({ fields: ['status'] }); + + this.logger.info(`Sent replication finished message for offer_id ${offerId} to node ${dcNodeId}`); return { commands: [ { diff --git a/modules/command/dh/dh-replication-timeout-command.js b/modules/command/dh/dh-replication-timeout-command.js new file mode 100644 index 0000000000..59c6bb1d7c --- /dev/null +++ b/modules/command/dh/dh-replication-timeout-command.js @@ -0,0 +1,69 @@ +const Command = require('../command'); +const Models = require('../../../models'); + +/** + * Handles new offer from the DH side + */ +class DHOfferTimeoutCommand extends Command { + constructor(ctx) { + super(ctx); + this.logger = ctx.logger; + } + + /** + * Executes command and produces one or more events + * @param command + */ + async execute(command) { + const { + offerId, + dcNodeId, + } = command.data; + + + const bid = await Models.bids.findOne({ + where: { offer_id: offerId, status: 'SENT' }, + }); + + if (bid) { + bid.status = 'EXPIRED'; + this.logger.warn(`Offer ${offerId} has not been replicated.`); + } + + return Command.empty(); + } + + /** + * Try to recover command + * @param command + * @param err + * @return {Promise<{commands: *[]}>} + */ + async recover(command, err) { + const { + offerId, + } = command.data; + + const bid = await Models.bids.findOne({ where: { offer_id: offerId } }); + bid.status = 'FAILED'; + await bid.save({ fields: ['status'] }); + return Command.empty(); + } + + /** + * Builds default command + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'dhOfferHandleCommand', + delay: 0, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +module.exports = DHOfferTimeoutCommand; diff --git a/modules/controller/dh-controller.js b/modules/controller/dh-controller.js index 7b915f2a7b..803725773f 100644 --- a/modules/controller/dh-controller.js +++ b/modules/controller/dh-controller.js @@ -1,3 +1,5 @@ +const path = require('path'); + const Utilities = require('../Utilities'); const Models = require('../../models'); const constants = require('../constants'); @@ -12,6 +14,8 @@ class DHController { this.blockchain = ctx.blockchain; this.graphStorage = ctx.graphStorage; this.importService = ctx.importService; + this.transport = ctx.transport; + this.commandExecutor = ctx.commandExecutor; } isParameterProvided(request, response, parameter_name) { @@ -30,6 +34,44 @@ class DHController { return true; } + async handleReplicationData(dcNodeId, replicationMessage, response) { + try { + const { + offer_id: offerId, otJson, permissionedData, + } = replicationMessage; + + + this.logger.notify(`Received replication data for offer_id ${offerId} from node ${dcNodeId}.`); + + const cacheDirectory = path.join(this.config.appDataPath, 'import_cache'); + + await Utilities.writeContentsToFile( + cacheDirectory, + offerId, + JSON.stringify({ + otJson, + permissionedData, + }), + ); + + const packedResponse = DHController._stripResponse(replicationMessage); + Object.assign(packedResponse, { + dcNodeId, + documentPath: path.join(cacheDirectory, offerId), + }); + + await this.commandExecutor.add({ + name: 'dhReplicationImportCommand', + data: packedResponse, + transactional: false, + }); + } catch (e) { + await this.transport.sendResponse(response, { status: 'fail', message: e }); + } + + await this.transport.sendResponse(response, { status: 'success' }); + } + async whitelistViewer(request, response) { this.logger.api('POST: Whitelisting of data viewer request received.'); @@ -190,6 +232,28 @@ class DHController { return response.concat(trailExtension); } } + + /** + * Parse network response + * @param response - Network response + * @private + */ + static _stripResponse(response) { + return { + offerId: response.offer_id, + dataSetId: response.data_set_id, + dcWallet: response.dc_wallet, + dcNodeId: response.dcNodeId, + litigationPublicKey: response.litigation_public_key, + litigationRootHash: response.litigation_root_hash, + distributionPublicKey: response.distribution_public_key, + distributionPrivateKey: response.distribution_private_key, + distributionEpk: response.distribution_epk, + transactionHash: response.transaction_hash, + encColor: response.color, + dcIdentity: response.dcIdentity, + }; + } } module.exports = DHController; diff --git a/modules/network/http/http-network.js b/modules/network/http/http-network.js index 208b60ee04..1962704338 100644 --- a/modules/network/http/http-network.js +++ b/modules/network/http/http-network.js @@ -27,6 +27,15 @@ class HttpNetwork { return HttpNetwork.send(contact.hostname, data, this.config.identity); }; + this.node.replicationData = async (message, contactId) => { + const data = { + type: 'kad-replication-data', + message, + }; + const contact = await this.node.getContact(contactId); + return HttpNetwork.send(contact.hostname, data, this.config.identity); + }; + this.node.replicationFinished = async (message, contactId) => { const data = { type: 'kad-replication-finished', diff --git a/modules/network/kademlia/kademlia.js b/modules/network/kademlia/kademlia.js index 3bb326d50b..97c7c3d80e 100644 --- a/modules/network/kademlia/kademlia.js +++ b/modules/network/kademlia/kademlia.js @@ -25,6 +25,7 @@ const { NetworkRequestIgnoredError } = require('../../errors/index'); const directMessageRequests = [ { methodName: 'replicationRequest', routeName: 'kad-replication-request' }, + { methodName: 'replicationData', routeName: 'kad-replication-data' }, { methodName: 'replacementReplicationRequest', routeName: 'kad-replacement-replication-request' }, { methodName: 'replicationFinished', routeName: 'kad-replication-finished' }, { methodName: 'replacementReplicationFinished', routeName: 'kad-replacement-replication-finished' }, @@ -175,7 +176,8 @@ class Kademlia { 'kad-data-price-request', 'kad-data-price-response', 'kad-permissioned-data-read-response', 'kad-permissioned-data-read-request', 'kad-send-encrypted-key', 'kad-encrypted-key-process-result', - 'kad-replication-request', 'kad-replacement-replication-request', 'kad-replacement-replication-finished', + 'kad-replication-request', 'kad-replacement-replication-request', + 'kad-replication-data', 'kad-replacement-replication-finished', 'kad-public-key-request', 'kad-purchase-complete', ], difficulty: this.config.network.solutionDifficulty, @@ -495,10 +497,14 @@ class Kademlia { // sync this.node.use('kad-replication-request', (request, response, next) => { - this.log.debug('kad-replication-request received'); this.emitter.emit('kad-replication-request', request, response); }); + // sync + this.node.use('kad-replication-data', (request, response, next) => { + this.emitter.emit('kad-replication-data', request, response); + }); + // sync this.node.use('kad-replacement-replication-request', (request, response, next) => { this.log.debug('kad-replacement-replication-request received'); @@ -514,7 +520,6 @@ class Kademlia { // async this.node.use('kad-replication-finished', (request, response, next) => { - this.log.debug('kad-replication-finished received'); this.emitter.emit('kad-replication-finished', request, response); response.send([]); }); diff --git a/modules/service/dc-service.js b/modules/service/dc-service.js index d7b9ed8318..772852f7b2 100644 --- a/modules/service/dc-service.js +++ b/modules/service/dc-service.js @@ -263,11 +263,12 @@ class DCService { * @param wallet - DH wallet * @param identity - Network identity * @param dhIdentity - DH ERC725 identity + * @param async_enabled - Whether or not the DH supports asynchronous communication * @param response - Network response * @returns {Promise} */ - async handleReplicationRequest(offerId, wallet, identity, dhIdentity, response) { - this.logger.info(`Request for replication of offer external ID ${offerId} received. Sender ${identity}`); + async handleReplicationRequest(offerId, wallet, identity, dhIdentity, async_enabled, response) { + this.logger.info(`Received replication request for offer_id ${offerId} from node ${identity}.`); if (!offerId || !wallet || !dhIdentity) { const message = 'Asked replication without providing offer ID or wallet or identity.'; @@ -307,7 +308,25 @@ class DCService { return; } - await this._sendReplication(offer, wallet, identity, dhIdentity, response); + if (async_enabled) { + await this._sendReplicationAcknowledgement(offerId, identity, response); + + await this.commandExecutor.add({ + name: 'dcReplicationSendCommand', + delay: 0, + data: { + internalOfferId: offer.id, + offerId, + wallet, + identity, + dhIdentity, + response, + }, + transactional: false, + }); + } else { + await this._sendReplication(offer, wallet, identity, dhIdentity, response); + } } /** @@ -340,10 +359,11 @@ class DCService { * @param wallet - DH wallet * @param identity - Network identity * @param dhIdentity - DH ERC725 identity + * @param async_enabled - Whether or not the DH supports asynchronous communication * @param response - Network response * @returns {Promise} */ - async handleReplacementRequest(offerId, wallet, identity, dhIdentity, response) { + async handleReplacementRequest(offerId, wallet, identity, dhIdentity, async_enabled, response) { this.logger.info(`Replacement request for replication of offer ${offerId} received. Sender ${identity}`); if (!offerId || !wallet) { @@ -389,12 +409,50 @@ class DCService { try { await this.transport.sendResponse(response, { status: 'fail', + message: `DH ${identity} already applied for offer, currently with status ${usedDH.status}`, }); } catch (e) { this.logger.error(`Failed to send response 'fail' status. Error: ${e}.`); } } - await this._sendReplication(offer, wallet, identity, dhIdentity, response); + + + if (async_enabled) { + await this._sendReplicationAcknowledgement(offerId, identity, response); + + await this.commandExecutor.add({ + name: 'dcReplicationSendCommand', + delay: 0, + data: { + internalOfferId: offer.id, + wallet, + identity, + dhIdentity, + response, + }, + transactional: false, + }); + } else { + await this._sendReplication(offer, wallet, identity, dhIdentity, response); + } + } + + /** + * Sends a replication acknowledgment to da data holder + * @param offerId - OfferId + * @param dhNetworkIdentity - DH Network identity + * @param response - Network response + * @returns {Promise} + */ + async _sendReplicationAcknowledgement(offerId, dhNetworkIdentity, response) { + const payload = { + offer_id: offerId, + status: 'acknowledge', + }; + + // send replication acknowledgement to DH + await this.transport.sendResponse(response, payload); + this.logger.info(`Sending replication request acknowledgement for offer_id ${offerId} to node ${dhNetworkIdentity}.`); } /** @@ -407,26 +465,42 @@ class DCService { * @returns {Promise} */ async _sendReplication(offer, wallet, identity, dhIdentity, response) { - const colorNumber = Utilities.getRandomInt(2); + const usedDH = await models.replicated_data.findOne({ + where: { + dh_id: identity, + dh_wallet: wallet, + dh_identity: dhIdentity, + offer_id: offer.offer_id, + }, + }); + + let colorNumber = Utilities.getRandomInt(2); + if (usedDH != null && usedDH.status === 'STARTED' && usedDH.color) { + colorNumber = usedDH.color; + } + const color = this.replicationService.castNumberToColor(colorNumber); const replication = await this.replicationService.loadReplication(offer.id, color); - await models.replicated_data.create({ - dh_id: identity, - dh_wallet: wallet.toLowerCase(), - dh_identity: dhIdentity.toLowerCase(), - offer_id: offer.offer_id, - litigation_private_key: replication.litigationPrivateKey, - litigation_public_key: replication.litigationPublicKey, - distribution_public_key: replication.distributionPublicKey, - distribution_private_key: replication.distributionPrivateKey, - distribution_epk_checksum: replication.distributionEpkChecksum, - litigation_root_hash: replication.litigationRootHash, - distribution_root_hash: replication.distributionRootHash, - distribution_epk: replication.distributionEpk, - status: 'STARTED', - color: colorNumber, - }); + + if (!usedDH) { + await models.replicated_data.create({ + dh_id: identity, + dh_wallet: wallet.toLowerCase(), + dh_identity: dhIdentity.toLowerCase(), + offer_id: offer.offer_id, + litigation_private_key: replication.litigationPrivateKey, + litigation_public_key: replication.litigationPublicKey, + distribution_public_key: replication.distributionPublicKey, + distribution_private_key: replication.distributionPrivateKey, + distribution_epk_checksum: replication.distributionEpkChecksum, + litigation_root_hash: replication.litigationRootHash, + distribution_root_hash: replication.distributionRootHash, + distribution_epk: replication.distributionEpk, + status: 'STARTED', + color: colorNumber, + }); + } const toSign = [ Utilities.denormalizeHex(new BN(replication.distributionEpkChecksum).toString('hex')), @@ -476,7 +550,7 @@ class DCService { // send replication to DH await this.transport.sendResponse(response, payload); - this.logger.info(`Replication for offer ID ${offer.id} sent to ${identity}.`); + this.logger.info(`Successfully sent replication data for offer_id ${offer.offer_id} to node ${identity}.`); } /** diff --git a/package.json b/package.json index 0666c6820d..3adf4177af 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "4.1.12", + "version": "4.1.13", "description": "OriginTrail node", "main": ".eslintrc.js", "config": { diff --git a/test/bdd/steps/lib/otnode.js b/test/bdd/steps/lib/otnode.js index 0845ca7284..0e1b3b5b1f 100644 --- a/test/bdd/steps/lib/otnode.js +++ b/test/bdd/steps/lib/otnode.js @@ -234,7 +234,7 @@ class OtNode extends EventEmitter { this.emit('public-key-request'); } else if (line.match(/Export complete.*/gi)) { this.emit('export-complete'); - } else if (line.match(/.*\[DH] Replication finished for offer ID .+/gi)) { + } else if (line.match(/.*\[DH] Replication finished for offer_id .+/gi)) { const offerId = line.match(offerIdRegex)[0]; assert(offerId); this.state.addedBids.push(offerId); @@ -242,13 +242,13 @@ class OtNode extends EventEmitter { } else if (line.match(/I've been chosen for offer .+\./gi)) { const offerId = line.match(offerIdRegex)[0]; this.state.takenBids.push(offerId); - } else if (line.match(/Replication for offer ID .+ sent to .+/gi)) { - const internalOfferId = line.match(uuidRegex)[0]; + } else if (line.match(/Successfully sent replication data for offer_id .+ to node .+\./gi)) { + const offer_id = line.match(offerIdRegex)[0]; const dhId = line.match(identityRegex)[0]; - assert(internalOfferId); + assert(offer_id); assert(dhId); - this.state.replications.push({ internalOfferId, dhId }); - this.emit('dh-replicated', { internalOfferId, dhId }); + this.state.replications.push({ offer_id, dhId }); + this.emit('dh-replicated', { offer_id, dhId }); } else if (line.includes('Get profile by wallet ')) { // note that node's wallet can also be access via nodeConfiguration directly const wallet = line.match(walletRegex)[0]; diff --git a/test/bdd/steps/network.js b/test/bdd/steps/network.js index 9517a1fff5..dec34844af 100644 --- a/test/bdd/steps/network.js +++ b/test/bdd/steps/network.js @@ -711,8 +711,8 @@ Then(/^the last import should be the same on all nodes that replicated data$/, a await httpApiHelper.apiImportInfo(dc.state.node_rpc_url, this.state.lastImport.data.dataset_id); const promises = []; - dc.state.replications.forEach(({ internalOfferId, dhId }) => { - if (dc.state.offers.internalIDs[internalOfferId].dataSetId === + dc.state.replications.forEach(({ offer_id, dhId }) => { + if (dc.state.offers.offerIDs[offer_id].dataSetId === this.state.lastImport.data.dataset_id) { const node = this.state.nodes.find(node => node.state.identity === dhId);