Skip to content

Commit

Permalink
Merge pull request #1380 from OriginTrail/prerelease/mainnet
Browse files Browse the repository at this point in the history
OriginTrail Mainnet Release v4.1.13
  • Loading branch information
Kuki145 authored Nov 9, 2020
2 parents dc00429 + 9e15288 commit c87db5e
Show file tree
Hide file tree
Showing 12 changed files with 478 additions and 47 deletions.
51 changes: 47 additions & 4 deletions modules/EventEmitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class EventEmitter {
dcService,
dvController,
commandExecutor,
dhController,
} = this.ctx;

this._on('api-trail', (data) => {
Expand Down Expand Up @@ -435,6 +436,7 @@ class EventEmitter {
dcService,
dvController,
dcController,
dhController,
networkService,
} = this.ctx;

Expand Down Expand Up @@ -512,11 +514,14 @@ class EventEmitter {
}
}

const { offerId, wallet, dhIdentity } = replicationMessage;
const {
offerId, wallet, dhIdentity,
async_enabled,
} = replicationMessage;
const identity = transport.extractSenderID(request);
try {
await dcService.handleReplicationRequest(
offerId, wallet, identity, dhIdentity,
offerId, wallet, identity, dhIdentity, async_enabled,
response,
);
} catch (error) {
Expand All @@ -533,11 +538,49 @@ class EventEmitter {
}
});

this._on('kad-replication-data', async (request, response) => {
const kadReplicationRequest = transport.extractMessage(request);
let replicationMessage = kadReplicationRequest;

if (kadReplicationRequest.messageSignature) {
const { message, messageSignature } = kadReplicationRequest;
replicationMessage = message;

if (!Utilities.isMessageSigned(this.web3, message, messageSignature)) {
logger.warn(`We have a forger here. Signature doesn't match for message: ${JSON.stringify(message)}`);
return;
}
}

const senderIdentity = transport.extractSenderID(request);
try {
await dhController.handleReplicationData(
senderIdentity,
replicationMessage,
response,
);
} catch (error) {
const errorMessage = `Failed to handle replication data. ${error}.`;
logger.warn(errorMessage);

try {
await transport.sendResponse(response, {
status: 'fail',
});
} catch (e) {
logger.error(`Failed to send response 'fail' status. Error: ${e}.`); // TODO handle this case
}
}
});

// sync
this._on('kad-replacement-replication-request', async (request, response) => {
try {
const message = transport.extractMessage(request);
const { offerId, wallet, dhIdentity } = message;
const {
offerId, wallet, dhIdentity,
async_enabled,
} = message;
const { wallet: senderWallet } = transport.extractSenderInfo(request);
const identity = transport.extractSenderID(request);

Expand All @@ -546,7 +589,7 @@ class EventEmitter {
}

await dcService.handleReplacementRequest(
offerId, wallet, identity, dhIdentity,
offerId, wallet, identity, dhIdentity, async_enabled,
response,
);
} catch (error) {
Expand Down
147 changes: 147 additions & 0 deletions modules/command/dc/dc-replication-send-command.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
const BN = require('bn.js');
const Command = require('../command');
const Utilities = require('../../Utilities');
const Encryption = require('../../RSAEncryption');
const Models = require('../../../models/index');

/**
* Handles replication request
*/
class DcReplicationSendCommand extends Command {
constructor(ctx) {
super(ctx);
this.config = ctx.config;
this.logger = ctx.logger;
this.transport = ctx.transport;
this.web3 = ctx.web3;

this.replicationService = ctx.replicationService;
this.permissionedDataService = ctx.permissionedDataService;
this.importService = ctx.importService;
}

/**
* Creates an offer in the database
* @param command
* @returns {Promise<{commands}>}
*/
async execute(command) {
const {
internalOfferId, wallet, identity, dhIdentity, offerId,
} = command.data;


const usedDH = await Models.replicated_data.findOne({
where: {
dh_id: identity,
dh_wallet: wallet,
dh_identity: dhIdentity,
offer_id: offerId,
},
});

let colorNumber = Utilities.getRandomInt(2);
if (usedDH != null && usedDH.status === 'STARTED' && usedDH.color) {
colorNumber = usedDH.color;
}

const color = this.replicationService.castNumberToColor(colorNumber);

const offer = await Models.offers.findOne({ where: { id: internalOfferId } });
const replication = await this.replicationService.loadReplication(offer.id, color);

if (!usedDH) {
await Models.replicated_data.create({
dh_id: identity,
dh_wallet: wallet.toLowerCase(),
dh_identity: dhIdentity.toLowerCase(),
offer_id: offer.offer_id,
litigation_private_key: replication.litigationPrivateKey,
litigation_public_key: replication.litigationPublicKey,
distribution_public_key: replication.distributionPublicKey,
distribution_private_key: replication.distributionPrivateKey,
distribution_epk_checksum: replication.distributionEpkChecksum,
litigation_root_hash: replication.litigationRootHash,
distribution_root_hash: replication.distributionRootHash,
distribution_epk: replication.distributionEpk,
status: 'STARTED',
color: colorNumber,
});
}

const toSign = [
Utilities.denormalizeHex(new BN(replication.distributionEpkChecksum).toString('hex')),
Utilities.denormalizeHex(replication.distributionRootHash),
];
const distributionSignature = Encryption.signMessage(
this.web3, toSign,
Utilities.normalizeHex(this.config.node_private_key),
);

const permissionedData = await this.permissionedDataService.getAllowedPermissionedData(
offer.data_set_id,
identity,
);

const promises = [];
for (const ot_object_id in permissionedData) {
promises.push(this.importService.getOtObjectById(offer.data_set_id, ot_object_id));
}

const ot_objects = await Promise.all(promises);

await this.permissionedDataService.attachPermissionedDataToMap(
permissionedData,
ot_objects,
);

const payload = {
offer_id: offer.offer_id,
data_set_id: offer.data_set_id,
dc_wallet: this.config.node_wallet,
dcIdentity: this.config.erc725Identity,
dcNodeId: this.config.network.identity,
otJson: replication.otJson,
permissionedData,
litigation_public_key: replication.litigationPublicKey,
distribution_public_key: replication.distributionPublicKey,
distribution_private_key: replication.distributionPrivateKey,
distribution_epk_checksum: replication.distributionEpkChecksum,
litigation_root_hash: replication.litigationRootHash,
distribution_root_hash: replication.distributionRootHash,
distribution_epk: replication.distributionEpk,
distribution_signature: distributionSignature.signature,
transaction_hash: offer.transaction_hash,
distributionSignature,
color: colorNumber,
};

// send replication to DH
const response = await this.transport.replicationData(payload, identity);

if (response.status === 'fail') {
this.logger.warn(`Sending replication data for offer ${offer.id} to ${identity} failed. ${response.message}`);
} else {
this.logger.info(`Successfully sent replication data for offer_id ${offer.offer_id} to node ${identity}.`);
}

return Command.empty();
}

/**
* Builds default command
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'dcReplicationSendCommand',
delay: 0,
transactional: false,
};
Object.assign(command, map);
return command;
}
}

module.exports = DcReplicationSendCommand;
27 changes: 22 additions & 5 deletions modules/command/dh/dh-offer-handle-command.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const path = require('path');
const fs = require('fs');

const Command = require('../command');
const Models = require('../../../models/index');
const Models = require('../../../models');
const Utilities = require('../../Utilities');

/**
Expand All @@ -13,7 +13,7 @@ class DHOfferHandleCommand extends Command {
this.logger = ctx.logger;
this.config = ctx.config;
this.transport = ctx.transport;
this.blockchain = ctx.blockchain;
this.commandExecutor = ctx.commandExecutor;
}

/**
Expand All @@ -26,7 +26,7 @@ class DHOfferHandleCommand extends Command {
dcNodeId,
} = command.data;

this.logger.trace(`Sending replication request for offer ${offerId} to ${dcNodeId}.`);
this.logger.trace(`Sending replication request for offer ${offerId} to node ${dcNodeId}.`);
const response = await this.transport.replicationRequest({
offerId,
wallet: this.config.node_wallet,
Expand Down Expand Up @@ -58,7 +58,24 @@ class DHOfferHandleCommand extends Command {
bid.status = 'SENT';
await bid.save({ fields: ['status'] });

this.logger.notify(`Replication request for ${offerId} sent to ${dcNodeId}. Response received.`);
if (response.status === 'acknowledge') {
this.logger.notify(`Received replication request acknowledgement for offer_id ${offerId} from node ${dcNodeId}.`);

return {
commands: [
{
name: 'dhReplicationTimeoutCommand',
delay: this.config.dc_choose_time,
data: {
offerId,
dcNodeId,
},
},
],
};
}

this.logger.notify(`Received replication data for offer_id ${offerId} from node ${dcNodeId}.`);

const cacheDirectory = path.join(this.config.appDataPath, 'import_cache');

Expand Down
9 changes: 6 additions & 3 deletions modules/command/dh/dh-replication-import-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class DhReplicationImportCommand extends Command {
throw Error(`Calculated root hash ${decryptedGraphRootHash} differs from document root hash ${originalRootHash}`);
}


// TODO: Verify EPK checksum
// TODO: Verify distribution keys and hashes
// TODO: Verify data creator id
Expand Down Expand Up @@ -168,7 +167,7 @@ class DhReplicationImportCommand extends Command {
origin: 'HOLDING',
});
}
this.logger.important(`[DH] Replication finished for offer ID ${offerId}`);
this.logger.important(`[DH] Replication finished for offer_id ${offerId}`);

const toSign = [
Utilities.denormalizeHex(offerId),
Expand All @@ -184,7 +183,11 @@ class DhReplicationImportCommand extends Command {
};

await this.transport.replicationFinished(replicationFinishedMessage, dcNodeId);
this.logger.info(`Replication request for ${offerId} sent to ${dcNodeId}`);
const bid = await Models.bids.findOne({ where: { offer_id: offerId } });
bid.status = 'REPLICATED';
await bid.save({ fields: ['status'] });

this.logger.info(`Sent replication finished message for offer_id ${offerId} to node ${dcNodeId}`);
return {
commands: [
{
Expand Down
69 changes: 69 additions & 0 deletions modules/command/dh/dh-replication-timeout-command.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
const Command = require('../command');
const Models = require('../../../models');

/**
* Handles new offer from the DH side
*/
class DHOfferTimeoutCommand extends Command {
constructor(ctx) {
super(ctx);
this.logger = ctx.logger;
}

/**
* Executes command and produces one or more events
* @param command
*/
async execute(command) {
const {
offerId,
dcNodeId,
} = command.data;


const bid = await Models.bids.findOne({
where: { offer_id: offerId, status: 'SENT' },
});

if (bid) {
bid.status = 'EXPIRED';
this.logger.warn(`Offer ${offerId} has not been replicated.`);
}

return Command.empty();
}

/**
* Try to recover command
* @param command
* @param err
* @return {Promise<{commands: *[]}>}
*/
async recover(command, err) {
const {
offerId,
} = command.data;

const bid = await Models.bids.findOne({ where: { offer_id: offerId } });
bid.status = 'FAILED';
await bid.save({ fields: ['status'] });
return Command.empty();
}

/**
* Builds default command
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'dhOfferHandleCommand',
delay: 0,
transactional: false,
};
Object.assign(command, map);
return command;
}
}

module.exports = DHOfferTimeoutCommand;
Loading

0 comments on commit c87db5e

Please sign in to comment.