Skip to content

Commit

Permalink
Version 1.3.8 (#471)
Browse files Browse the repository at this point in the history
* Catchup with develop
  • Loading branch information
kipliklotrika authored Aug 17, 2018
1 parent 38677de commit 483705f
Show file tree
Hide file tree
Showing 18 changed files with 998 additions and 31 deletions.
4 changes: 2 additions & 2 deletions modules/Blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ class Blockchain {
* @param {number} - importId
* @returns {Promise}
*/
cancelEscrow(dhWallet, importId) {
return this.blockchain.cancelEscrow(dhWallet, importId);
cancelEscrow(dhWallet, importId, dhIsSender) {
return this.blockchain.cancelEscrow(dhWallet, importId, dhIsSender);
}

/**
Expand Down
29 changes: 20 additions & 9 deletions modules/Blockchain/Ethereum/Transactions.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@ class Transactions {
this.queue = new Queue((async (args, cb) => {
const { transaction, future } = args;
try {
const delta = (Date.now() - this.lastTransactionTime);
if (delta < 2000) {
await sleep.sleep(2000);
}
const result = await this._sendTransaction(transaction);
if (result.status === '0x0') {
future.reject(result);
} else {
future.resolve(result);
for (let i = 0; i < 3; i += 1) {
try {
// eslint-disable-next-line no-await-in-loop
const result = await this._sendTransaction(transaction);
if (result.status === '0x0') {
future.reject(result);
break;
} else {
future.resolve(result);
break;
}
} catch (error) {
this.log.trace(`Nonce too low / underpriced detected. Retrying. ${error.toString()}`);
if (!error.toString().includes('nonce too low') && !error.toString().includes('underpriced')) {
throw new Error(error);
}
// eslint-disable-next-line no-await-in-loop
await sleep.sleep(2000);
}
}
} catch (e) {
future.reject(e);
Expand Down Expand Up @@ -71,6 +81,7 @@ class Transactions {
this.log.warn(`ETH balance running low! Your balance: ${currentBalance.toString()} wei, while minimum required is: ${requiredAmount.toString()} wei`);
}

this.log.trace(`Sending transaction to blockchain, nonce ${newTransaction.options.nonce}, balance is ${currentBalance.toString()}`);
return this.web3.eth.sendSignedTransaction(`0x${serializedTx}`);
}

Expand Down
5 changes: 3 additions & 2 deletions modules/Blockchain/Ethereum/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class Ethereum {
* @param {number} - importId
* @returns {Promise}
*/
cancelEscrow(dhWallet, importId) {
cancelEscrow(dhWallet, importId, dhIsSender) {
const options = {
gasLimit: this.web3.utils.toHex(this.config.gas_limit),
gasPrice: this.web3.utils.toHex(this.config.gas_price),
Expand All @@ -380,8 +380,9 @@ class Ethereum {
this.escrowContractAbi,
'cancelEscrow',
[
dhWallet,
importId,
dhWallet,
dhIsSender,
],
options,
);
Expand Down
24 changes: 12 additions & 12 deletions modules/EventEmitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class EventEmitter {
}
data.response.send(res);
}).catch((error) => {
logger.error(`Failed to get trail for query ${data.query}`);
logger.error(`Failed to get trail for query ${JSON.stringify(data.query)}`);
notifyError(error);
data.response.status(500);
data.response.send({
Expand Down Expand Up @@ -167,7 +167,7 @@ class EventEmitter {
});

this._on('api-get-imports', (data) => {
logger.info(`Get imports triggered with query ${data.query}`);
logger.info(`Get imports triggered with query ${JSON.stringify(data.query)}`);
product.getImports(data.query).then((res) => {
if (res.length === 0) {
data.response.status(204);
Expand All @@ -176,7 +176,7 @@ class EventEmitter {
}
data.response.send(res);
}).catch((error) => {
logger.error(`Failed to get imports for query ${data.query}`);
logger.error(`Failed to get imports for query ${JSON.stringify(data.query)}`);
notifyError(error);
data.response.status(500);
data.response.send({
Expand All @@ -186,7 +186,7 @@ class EventEmitter {
});

this._on('api-query', (data) => {
logger.info(`Get veritces triggered with query ${data.query}`);
logger.info(`Get veritces triggered with query ${JSON.stringify(data.query)}`);
product.getVertices(data.query).then((res) => {
if (res.length === 0) {
data.response.status(204);
Expand All @@ -195,11 +195,11 @@ class EventEmitter {
}
data.response.send(res);
}).catch((error) => {
logger.error(`Failed to get vertices for query ${data.query}`);
logger.error(`Failed to get vertices for query ${JSON.stringify(data.query)}`);
notifyError(error);
data.response.status(500);
data.response.send({
message: `Failed to get vertices for query ${data.query}`,
message: `Failed to get vertices for query ${JSON.stringify(data.query)}`,
});
});
});
Expand All @@ -225,10 +225,10 @@ class EventEmitter {
blockchain.getRootHash(dcWallet, importId).then((res) => {
data.response.send(res);
}).catch((err) => {
logger.error(`Failed to get root hash for query ${data.query}`);
logger.error(`Failed to get root hash for query ${JSON.stringify(data.query)}`);
notifyError(err);
data.response.status(500);
data.response.send(`Failed to get root hash for query ${data.query}`); // TODO rethink about status codes
data.response.send(`Failed to get root hash for query ${JSON.stringify(data.query)}`); // TODO rethink about status codes
});
});

Expand Down Expand Up @@ -345,7 +345,7 @@ class EventEmitter {
import_id,
root_hash,
total_documents,
wallet,
wallet, // TODO: Sender's wallet is ignored for now.
vertices,
} = response;

Expand All @@ -355,7 +355,7 @@ class EventEmitter {
.create({
import_id,
root_hash,
data_provider_wallet: wallet,
data_provider_wallet: config.node_wallet,
import_timestamp: new Date(),
total_documents,
data_size: dataSize,
Expand Down Expand Up @@ -931,7 +931,7 @@ class EventEmitter {
logger.warn(returnMessage);
return;
}
await dhService.handleDataReadRequest(message);
await dhController.handleDataReadRequestFree(message);
});

// async
Expand All @@ -951,7 +951,7 @@ class EventEmitter {
}

try {
await dvService.handleDataReadResponse(message);
await dvController.handleDataReadResponseFree(message);
} catch (error) {
logger.warn(`Failed to process data read response. ${error}.`);
notifyError(error);
Expand Down
1 change: 1 addition & 0 deletions modules/command/dc/dc-escrow-cancel-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class DCEscrowCancelCommand extends Command {
await this.blockchain.cancelEscrow(
dhWallet,
importId,
false,
);
await this.network.kademlia().sendVerifyImportResponse({
status: 'fail',
Expand Down
167 changes: 167 additions & 0 deletions modules/command/dh/dh-data-read-request-free-command.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
const Models = require('../../../models/index');
const Command = require('../command');

const BN = require('bn.js');
const Utilities = require('../../Utilities');
const ImportUtilities = require('../../ImportUtilities');
const Graph = require('../../Graph');

/**
* Free read request command.
*/
class DHDataReadRequestFreeCommand extends Command {
constructor(ctx) {
super(ctx);
this.logger = ctx.logger;
this.graphStorage = ctx.graphStorage;
this.config = ctx.config;
this.web3 = ctx.web3;
this.network = ctx.network;
this.notifyError = ctx.notifyError;
}

/**
* Executes command and produces one or more events
* @param command
* @param transaction
*/
async execute(command, transaction) {
const {
message,
} = command.data;

/*
message: {
id: REPLY_ID,
import_id: IMPORT_ID,
wallet: DH_WALLET,
nodeId: KAD_ID
}
*/

// TODO in order to avoid getting a different import.
const {
nodeId, wallet, id, import_id,
} = message;
try {
// Check is it mine offer.
const networkReplyModel = await Models.network_replies.find({ where: { id } });
if (!networkReplyModel) {
throw Error(`Couldn't find reply with ID ${id}.`);
}

const offer = networkReplyModel.data;

if (networkReplyModel.receiver_wallet !== wallet &&
networkReplyModel.receiver_identity) {
throw Error('Sorry not your read request');
}

// TODO: Only one import ID used. Later we'll support replication from multiple imports.
// eslint-disable-next-line
const importId = import_id;

const verticesPromise = this.graphStorage.findVerticesByImportId(importId);
const edgesPromise = this.graphStorage.findEdgesByImportId(importId);

const values = await Promise.all([verticesPromise, edgesPromise]);
const vertices = values[0];
const edges = values[1];

ImportUtilities.unpackKeys(vertices, edges);

const dataInfo = await Models.data_info.findOne({
where: {
import_id: importId,
},
});

if (!dataInfo) {
throw Error(`Failed to get data info for import ID ${importId}.`);
}

ImportUtilities.deleteInternal(vertices);

// Get replication key and then encrypt data.
const holdingDataModel = await Models.holding_data.find({ where: { id: importId } });

if (holdingDataModel) {
const holdingData = holdingDataModel.get({ plain: true });
const dataPublicKey = holdingData.data_public_key;
const replicationPrivateKey = holdingData.distribution_private_key;

Graph.decryptVertices(
vertices.filter(vertex => vertex.vertex_type !== 'CLASS'),
dataPublicKey,
);
}

/*
dataReadResponseObject = {
message: {
id: REPLY_ID
wallet: DH_WALLET,
nodeId: KAD_ID
agreementStatus: CONFIRMED/REJECTED,
data_provider_wallet,
encryptedData: { … }
},
messageSignature: {
c: …,
r: …,
s: …
}
}
*/

const replyMessage = {
id,
wallet: this.config.node_wallet,
nodeId: this.config.identity,
data_provider_wallet: dataInfo.data_provider_wallet,
agreementStatus: 'CONFIRMED',
data: {
vertices,
edges,
},
import_id: importId, // TODO: Temporal. Remove it.
};
const dataReadResponseObject = {
message: replyMessage,
messageSignature: Utilities.generateRsvSignature(
JSON.stringify(replyMessage),
this.web3,
this.config.node_private_key,
),
};

await this.network.kademlia().sendDataReadResponse(dataReadResponseObject, nodeId);
} catch (e) {
const errorMessage = `Failed to process data read request. ${e}.`;
this.logger.warn(errorMessage);
this.notifyError(e);
await this.network.kademlia().sendDataReadResponse({
status: 'FAIL',
message: errorMessage,
}, nodeId);
}

return Command.empty();
}

/**
* Builds default command
* @param map
* @returns {{add, data: *, delay: *, deadline: *}}
*/
default(map) {
const command = {
name: 'dhDataReadRequestFreeCommand',
transactional: true,
};
Object.assign(command, map);
return command;
}
}

module.exports = DHDataReadRequestFreeCommand;
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DHOfferReplicationParametersCommand extends Command {
} = command.data;

const encryptedVertices = importResult.vertices.filter(vertex => vertex.vertex_type !== 'CLASS');
ImportUtilities.sort(encryptedVertices, '_dc_key');
ImportUtilities.sort(encryptedVertices);
const litigationBlocks = Challenge.getBlocks(encryptedVertices, 32);
const litigationBlocksMerkleTree = new MerkleTree(litigationBlocks);
const litigationRootHash = litigationBlocksMerkleTree.getRoot();
Expand Down
10 changes: 9 additions & 1 deletion modules/command/dh/dh-read-data-location-request-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ class DHReadDataLocationRequestCommand extends Command {
const nodeId = this.config.identity;
const dataPrice = 100000; // TODO add to configuration

// TODO: Temporarily allow sending raw data (free read).
// Merge with all imports
imports.forEach((importId) => {
if (!replicatedImportIds.includes(importId)) {
replicatedImportIds.push(importId);
}
});

const dataInfos = await Models.data_info.findAll({
where: {
import_id: {
Expand All @@ -83,7 +91,7 @@ class DHReadDataLocationRequestCommand extends Command {
});

if (importObjects.length === 0) {
this.logger.warn(`Zero import size for IDs ${JSON.stringify(replicatedImportIds)}.`);
this.logger.trace(`Didn't find imports for query ${JSON.stringify(msgQuery)}.`);
return Command.empty();
}

Expand Down
8 changes: 8 additions & 0 deletions modules/command/dv/dv-data-read-request-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ class DVDataReadRequestCommand extends Command {
}
*/

const dataInfo = await Models.data_info.findOne({
where: { import_id: importId },
});
if (dataInfo) {
this.logger.trace(`I've already stored data for import ID ${importId}. Purchase ignored.`);
return Command.empty();
}

const offer = await Models.network_query_responses.findOne({
where: {
query_id: queryId,
Expand Down
Loading

0 comments on commit 483705f

Please sign in to comment.