diff --git a/modules/Blockchain/Ethereum/Transactions.js b/modules/Blockchain/Ethereum/Transactions.js index 4b1735f0b8..ced7764ac5 100644 --- a/modules/Blockchain/Ethereum/Transactions.js +++ b/modules/Blockchain/Ethereum/Transactions.js @@ -23,8 +23,8 @@ class Transactions { const { transaction, future } = args; try { const delta = (Date.now() - this.lastTransactionTime); - if (delta < 5000) { - await sleep.sleep(5000); + if (delta < 2000) { + await sleep.sleep(2000); } const result = await this._sendTransaction(transaction); if (result.status === '0x0') { diff --git a/modules/Blockchain/Ethereum/index.js b/modules/Blockchain/Ethereum/index.js index 157aa3d71a..735d185c9b 100644 --- a/modules/Blockchain/Ethereum/index.js +++ b/modules/Blockchain/Ethereum/index.js @@ -119,7 +119,7 @@ class Ethereum { const importIdHash = Utilities.sha3(importId); - this.log.notify('Writing root hash to blockchain'); + this.log.notify(`Writing root hash to blockchain for import ${importId}`); return this.transactions.queueTransaction(this.otContractAbi, 'addFingerPrint', [importId, importIdHash, rootHash], options); } @@ -1040,7 +1040,7 @@ class Ethereum { * Get replication modifier */ async getReplicationModifier() { - this.log.trace('Get replication modifier ... '); + this.log.trace('get replication modifier from blockchain'); return this.biddingContract.methods.replication_modifier().call({ from: this.config.wallet_address, }); diff --git a/modules/Challenger.js b/modules/Challenger.js index 060d01f5a1..39dbc748e6 100644 --- a/modules/Challenger.js +++ b/modules/Challenger.js @@ -115,7 +115,7 @@ class Challenger { payload, challenge.dh_id, async (error, response) => { if (error) { - log.warn(`challenge-request: failed to get answer. Error: ${error}.`); + log.warn(`failed to get challenge answer from ${challenge.dh_id}. ${error}.`); return; } diff --git a/modules/DCService.js b/modules/DCService.js index 5425abf085..a929ba5f87 100644 --- a/modules/DCService.js +++ b/modules/DCService.js @@ -139,7 +139,7 @@ class DCService { this.remoteControl.initializingOffer(importId); const profileBalance = - new BN((await this.blockchain.getProfile(config.node_wallet)).balance, 10); + new BN((await this.blockchain.getProfile(config.node_wallet)).balance, 10); const replicationModifier = await this.blockchain.getReplicationModifier(); @@ -165,7 +165,7 @@ class DCService { dhWallets, dhIds, ).then(async () => { - this.log.info('Offer written to blockchain. Started bidding phase.'); + this.log.important(`Offer ${importId} written to blockchain. Started bidding phase.`); this.remoteControl.biddingStarted(importId); offer.status = 'STARTED'; await offer.save({ fields: ['status'] }); @@ -188,38 +188,41 @@ class DCService { return; } - this.log.trace('Started choosing phase.'); - this.remoteControl.biddingComplete(importId); - this.remoteControl.choosingBids(importId); - - offer.status = 'FINALIZING'; - offer.save({ fields: ['status'] }); - this.chooseBids(offer.id, totalEscrowTime).then(() => { - this.blockchain.subscribeToEvent('OfferFinalized', offer.import_id) - .then(() => { - const errorMsg = `Offer for import ${offer.import_id} finalized`; - offer.status = 'FINALIZED'; - this.remoteControl.bidChosen(importId); - this.remoteControl.offerFinalized(`Offer for import ${offer.import_id} finalized`, importId); - offer.message = errorMsg; - offer.save({ fields: ['status', 'message'] }); - this.log.info(errorMsg); - }).catch((error) => { - const errorMsg = `Failed to get offer for import ${offer.import_id}). ${error}.`; - offer.status = 'FAILED'; - offer.message = errorMsg; - offer.save({ fields: ['status', 'message'] }); - this.log.error(errorMsg); - this.remoteControl.dcErrorHandling(errorMsg); - }); - }).catch((err) => { - const errorMsg = `Failed to choose bids. ${err}`; - offer.status = 'FAILED'; - offer.message = errorMsg; - offer.save({ fields: ['status', 'message'] }); - this.log.error(errorMsg); - this.remoteControl.dcErrorHandling(errorMsg); - }); + setTimeout(() => { + this.log.trace('Started choosing phase.'); + this.remoteControl.biddingComplete(importId); + this.remoteControl.choosingBids(importId); + + offer.status = 'FINALIZING'; + offer.save({ fields: ['status'] }); + + this.chooseBids(offer.id, totalEscrowTime).then(() => { + this.blockchain.subscribeToEvent('OfferFinalized', offer.import_id) + .then(() => { + const errorMsg = `Offer for import ${offer.import_id} finalized`; + offer.status = 'FINALIZED'; + this.remoteControl.bidChosen(importId); + this.remoteControl.offerFinalized(`Offer for import ${offer.import_id} finalized`, importId); + offer.message = errorMsg; + offer.save({ fields: ['status', 'message'] }); + this.log.info(errorMsg); + }).catch((error) => { + const errorMsg = `Failed to get offer for import ${offer.import_id}). ${error}.`; + offer.status = 'FAILED'; + offer.message = errorMsg; + offer.save({ fields: ['status', 'message'] }); + this.log.error(errorMsg); + this.remoteControl.dcErrorHandling(errorMsg); + }); + }).catch((err) => { + const errorMsg = `Failed to choose bids. ${err}`; + offer.status = 'FAILED'; + offer.message = errorMsg; + offer.save({ fields: ['status', 'message'] }); + this.log.error(errorMsg); + this.remoteControl.dcErrorHandling(errorMsg); + }); + }, 30000); }); }).catch((err) => { const errorMsg = `Failed to create offer. ${err}.`; @@ -264,25 +267,19 @@ class DCService { /** * Chose DHs * @param offerId Offer identifier - * @param totalEscrowTime Total escrow time + * @param totalEscrowTime Total escrow time */ chooseBids(offerId, totalEscrowTime) { return new Promise((resolve, reject) => { Models.offers.findOne({ where: { id: offerId } }).then((offerModel) => { const offer = offerModel.get({ plain: true }); this.log.info(`Choose bids for offer ID ${offerId}, import ID ${offer.import_id}.`); - this.blockchain.increaseApproval(offer.max_token_amount * offer.replication_number) + this.blockchain.chooseBids(offer.import_id) .then(() => { - this.blockchain.chooseBids(offer.import_id) - .then(() => { - this.log.info(`Bids chosen for offer ID ${offerId}, import ID ${offer.import_id}.`); - resolve(); - }).catch((err) => { - this.log.warn(`Failed call choose bids for offer ID ${offerId}, import ID ${offer.import_id}. ${err}`); - reject(err); - }); + this.log.info(`Bids chosen for offer ID ${offerId}, import ID ${offer.import_id}.`); + resolve(); }).catch((err) => { - this.log.warn(`Failed to increase allowance. ${JSON.stringify(err)}`); + this.log.warn(`Failed call choose bids for offer ID ${offerId}, import ID ${offer.import_id}. ${err}`); reject(err); }); }).catch((err) => { @@ -370,7 +367,10 @@ class DCService { importId, kadWallet, ); - this.log.important('Data successfully verified, preparing to start challenges'); + this.log.important(`Holding data for offer ${importId} and contact ${kadWallet} successfully verified. Challenges taking place...`); + + replicatedData.status = 'ACTIVE'; + await replicatedData.save({ fields: ['status'] }); await this.network.kademlia().sendVerifyImportResponse({ status: 'success', diff --git a/modules/DHService.js b/modules/DHService.js index 222b1db368..6bae3a96ce 100644 --- a/modules/DHService.js +++ b/modules/DHService.js @@ -48,20 +48,20 @@ class DHService { predeterminedBid, ) { try { + dcNodeId = dcNodeId.substring(2, 42); + const dcContact = await this.network.kademlia().getContact(dcNodeId, true); + if (dcContact == null || dcContact.hostname == null) { + // wait until peers are synced + return; + } + // Check if mine offer and if so ignore it. const offerModel = await Models.offers.findOne({ where: { import_id: importId } }); if (offerModel) { - const offer = offerModel.get({ plain: true }); - this.log.trace(`Mine offer (ID ${offer.data_hash}). Offer ignored`); return; } - dcNodeId = dcNodeId.substring(2, 42); - const dcContact = await this.network.kademlia().getContact(dcNodeId, true); - if (dcContact == null || dcContact.hostname == null) { - this.log.trace(`Unknown DC contact ${dcNodeId} for import ${importId}. Offer ignored.`); - return; - } + this.log.info(`New offer has been created by ${dcNodeId}. Offer ID ${importId}.`); const distanceParams = await this.blockchain.getDistanceParameters(importId); @@ -71,7 +71,7 @@ class DHService { const k = distanceParams[4]; const numNodes = distanceParams[5]; - if (this.amIClose(k, numNodes, dataHash, nodeHash, 20000)) { + if (this.amIClose(k, numNodes, dataHash, nodeHash, 10000)) { this.log.notify('Close enough to take bid'); } else { this.log.notify('Not close enough to take bid'); @@ -180,8 +180,13 @@ class DHService { } if (!predeterminedBid) { - await this.blockchain.addBid(importId, this.config.identity); - bidEvent = await this.blockchain.subscribeToEvent('AddedBid', importId); + try { + await this.blockchain.addBid(importId, this.config.identity); + bidEvent = await this.blockchain.subscribeToEvent('AddedBid', importId); + } catch (err) { + this.log.info('Bid not added, your bid was probably too late and the offer has been closed'); + return; + } } else { const myBidIndex = await this.blockchain.getBidIndex( importId, @@ -249,7 +254,7 @@ class DHService { }, bid.dc_id, (err) => { if (err) { - this.log.warn(`Failed to send replication request ${err}`); + this.log.warn(`Failed to send replication request to ${bid.dc_id}. ${err}`); // TODO Cancel bid here. this.remoteControl.replicationReqestFailed(`Failed to send replication request ${err}`); } diff --git a/modules/DataReplication.js b/modules/DataReplication.js index db69889428..24ccfc1a22 100644 --- a/modules/DataReplication.js +++ b/modules/DataReplication.js @@ -25,22 +25,20 @@ class DataReplication { * @return object response */ async sendPayload(data) { - this.log.info('Entering sendPayload'); - const currentUnixTime = Date.now(); const options = { dh_wallet: config.dh_wallet, import_id: data.import_id, amount: data.vertices.length + data.edges.length, start_time: currentUnixTime, - total_time: 10 * 60000, + total_time: parseInt(config.total_escrow_time_in_milliseconds, 10), // TODO introduce BN }; ImportUtilities.sort(data.vertices); // TODO: Move test generation outside sendPayload(. const tests = Challenge.generateTests( - data.contact, options.import_id.toString(), 10, + data.contact, options.import_id.toString(), 20, options.start_time, options.start_time + options.total_time, 32, data.vertices, ); diff --git a/modules/EventEmitter.js b/modules/EventEmitter.js index a8ad28b3c5..ed1737c707 100644 --- a/modules/EventEmitter.js +++ b/modules/EventEmitter.js @@ -20,7 +20,7 @@ class EventEmitter { this.graphStorage = ctx.graphStorage; this._MAPPINGS = {}; - this._MAX_LISTENERS = 10; // limits the number of listeners in order to detect memory leaks + this._MAX_LISTENERS = 15; // limits the number of listeners in order to detect memory leaks } /** @@ -91,6 +91,7 @@ class EventEmitter { product, logger, remoteControl, + config, } = this.ctx; this._on('api-import-request', (data) => { @@ -225,6 +226,13 @@ class EventEmitter { this._on('api-network-query', (data) => { logger.info(`Network query handling triggered with query ID ${data.query}`); + if (!config.enoughFunds) { + data.response.status(400); + data.response.send({ + message: 'Insufficient funds', + }); + return; + } dvService.queryNetwork(data.query) .then((queryId) => { data.response.status(201); @@ -245,6 +253,9 @@ class EventEmitter { }); this._on('api-choose-offer', async (data) => { + if (!config.enoughFunds) { + return; + } const failFunction = (error) => { logger.warn(error); data.response.status(400); @@ -390,6 +401,13 @@ class EventEmitter { }); this._on('api-create-offer', async (data) => { + if (!config.enoughFunds) { + data.response.status(400); + data.response.send({ + message: 'Insufficient funds', + }); + return; + } const { import_id, total_escrow_time, @@ -399,9 +417,7 @@ class EventEmitter { } = data; try { - logger.info(`Create offer triggered with import_id ${import_id}, - total_escrow_time ${total_escrow_time}, max_token_amount ${max_token_amount}, - min_stake_amount ${min_stake_amount} and min_reputation ${min_reputation}.`); + logger.info(`Preparing to create offer for import ${import_id}`); let vertices = await this.graphStorage.findVerticesByImportId(import_id); vertices = vertices.map((vertex) => { delete vertex.private; @@ -441,7 +457,7 @@ class EventEmitter { this._on('api-gs1-import-request', async (data) => { try { - logger.info(`Gs1 import with ${data.filepath} triggered.`); + logger.info(`GS1 import with ${data.filepath} triggered.`); const responseObject = await importer.importXMLgs1(data.filepath); const { error } = responseObject; const { response } = responseObject; @@ -458,7 +474,7 @@ class EventEmitter { this._on('api-wot-import-request', async (data) => { try { - logger.info(`Wot import with ${data.filepath} triggered.`); + logger.info(`WOT import with ${data.filepath} triggered.`); const responseObject = await importer.importWOT(data.filepath); const { error } = responseObject; const { response } = responseObject; @@ -482,14 +498,13 @@ class EventEmitter { const { dhService, logger, - blockchain, config, - remoteControl, } = this.ctx; this._on('eth-OfferCreated', async (eventData) => { - logger.info(`New offer has been created by ${eventData.DC_node_id} in OriginTrail decentralized network.`); - + if (!config.enoughFunds) { + return; + } const { import_id, DC_node_id, @@ -515,6 +530,9 @@ class EventEmitter { }); this._on('eth-AddedPredeterminedBid', async (eventData) => { + if (!config.enoughFunds) { + return; + } const { import_id, DH_wallet, @@ -612,7 +630,6 @@ class EventEmitter { }); this._on('eth-EscrowVerified', async (eventData) => { - logger.trace('Received eth-EscrowVerified'); const { import_id, DH_wallet, @@ -620,6 +637,7 @@ class EventEmitter { if (config.node_wallet === DH_wallet) { // Event is for me. + logger.trace(`Escrow for import ${import_id} verified`); try { // TODO: Possible race condition if another bid for same import came meanwhile. const bid = await Models.bids.findOne({ @@ -686,12 +704,12 @@ class EventEmitter { // sync this._on('kad-replication-request', async (request) => { - logger.info('Request for replication of data received'); - const { import_id, wallet } = request.params.message; const { wallet: kadWallet } = request.contact[1]; const kadIdentity = request.contact[0]; + logger.info(`Request for replication of ${import_id} received. Sender ${kadIdentity}`); + if (!import_id || !wallet) { logger.warn('Asked replication without providing import ID or wallet.'); return; @@ -755,12 +773,12 @@ class EventEmitter { offer_id: offer.id, data_private_key: keyPair.privateKey, data_public_key: keyPair.publicKey, - status: 'ACTIVE', + status: 'PENDING', }); const dataInfo = Models.data_info.find({ where: { import_id } }); - logger.info('[DC] Preparing to enter sendPayload'); + logger.info(`Preparing to send payload for ${import_id} to ${kadIdentity}`); const data = { contact: kadIdentity, vertices, @@ -773,7 +791,7 @@ class EventEmitter { }; dataReplication.sendPayload(data).then(() => { - logger.info(`[DC] Payload sent. Replication ID ${replicatedData.id}.`); + logger.info(`Payload for ${import_id} sent to ${kadIdentity}.`); }).catch((error) => { logger.warn(`Failed to send payload to ${kadIdentity}. Replication ID ${replicatedData.id}. ${error}`); }); diff --git a/modules/Network.js b/modules/Network.js index 98b6d068b1..dafc0e252f 100644 --- a/modules/Network.js +++ b/modules/Network.js @@ -25,7 +25,7 @@ class Network { this.emitter = ctx.emitter; this.networkUtilities = ctx.networkUtilities; - kadence.constants.T_RESPONSETIMEOUT = 20000; + kadence.constants.T_RESPONSETIMEOUT = 40000; if (parseInt(config.test_network, 10)) { this.log.warn('Node is running in test mode, difficulties are reduced'); process.env.kadence_TestNetworkEnabled = config.test_network; @@ -162,11 +162,17 @@ class Network { virtualPort: config.onion_virtual_port, localMapping: `127.0.0.1:${config.node_port}`, torrcEntries: { - CircuitBuildTimeout: 10, - KeepalivePeriod: 60, - NewCircuitPeriod: 60, - NumEntryGuards: 8, - Log: 'notice stdout', + LearnCircuitBuildTimeout: 0, + CircuitBuildTimeout: 40, + CircuitStreamTimeout: 30, + MaxCircuitDirtiness: 7200, + MaxClientCircuitsPending: 1024, + SocksTimeout: 41, + CloseHSClientCircuitsImmediatelyOnTimeout: 1, + CloseHSServiceRendCircuitsImmediatelyOnTimeout: 1, + SafeLogging: 0, + FetchDirInfoEarly: 1, + FetchDirInfoExtraEarly: 1, }, passthroughLoggingEnabled: 1, })); @@ -189,13 +195,12 @@ class Network { if (utilities.isBootstrapNode()) { this.log.info(`Found ${bootstrapNodes.length} provided bootstrap node(s). Running as a Bootstrap node`); this.log.info(`Found additional ${peers.length} peers in peer cache`); - this.log.info(`Trying to contact ${nodes.length} peers`); } else { this.log.info(`Found ${bootstrapNodes.length} provided bootstrap node(s)`); this.log.info(`Found additional ${peers.length} peers in peer cache`); - this.log.info(`Trying to join the network from ${nodes.length} unique seeds`); } + this.log.info(`Sync with network from ${nodes.length} unique peers`); if (nodes.length === 0) { this.log.info('No bootstrap seeds provided and no known profiles'); this.log.info('Running in seed mode (waiting for connections)'); @@ -214,7 +219,7 @@ class Network { const func = url => new Promise((resolve, reject) => { try { - this.log.info(`Joining via ${url}`); + this.log.info(`Syncing with peers via ${url}.`); const contact = kadence.utils.parseContactURL(url); this._join(contact, (err) => { @@ -247,11 +252,7 @@ class Network { } if (result) { - this.log.important('Joined the network'); - const contact = kadence.utils.parseContactURL(result); - - this.log.info(`Connected to network via ${contact[0]} (http://${contact[1].hostname}:${contact[1].port})`); - this.log.info(`Discovered ${this.node.router.size} peers from seed`); + this.log.important('Initial sync with other peers done'); setTimeout(() => { this.node.refresh(this.node.router.getClosestBucket() + 1); diff --git a/ot-node.js b/ot-node.js index 299e329827..09c357baa5 100644 --- a/ot-node.js +++ b/ot-node.js @@ -47,6 +47,45 @@ process.on('unhandledRejection', (reason, p) => { * Main node object */ class OTNode { + async getBalances(Utilities, selectedBlockchain, web3, config, initial) { + let enoughETH = false; + let enoughtTRAC = false; + try { + const etherBalance = await Utilities.getBalanceInEthers( + web3, + selectedBlockchain.wallet_address, + ); + if (etherBalance <= 0) { + console.log('Please get some ETH in the node wallet fore running ot-node'); + enoughETH = false; + if (initial) { + process.exit(1); + } + } else { + enoughETH = true; + log.info(`Balance of ETH: ${etherBalance}`); + } + + const atracBalance = await Utilities.getAlphaTracTokenBalance( + web3, + selectedBlockchain.wallet_address, + selectedBlockchain.token_contract_address, + ); + if (atracBalance <= 0) { + enoughtTRAC = false; + console.log('Please get some ATRAC in the node wallet fore running ot-node'); + if (initial) { + process.exit(1); + } + } else { + enoughtTRAC = true; + log.info(`Balance of ATRAC: ${atracBalance}`); + } + } catch (error) { + console.log(error); + } + config.enoughFunds = enoughETH && enoughtTRAC; + } /** * OriginTrail node system bootstrap function */ @@ -140,36 +179,12 @@ class OTNode { // check does node_wallet has sufficient Ether and ATRAC tokens if (process.env.NODE_ENV !== 'test') { - try { - const etherBalance = await Utilities.getBalanceInEthers( - web3, - selectedBlockchain.wallet_address, - ); - if (etherBalance <= 0) { - console.log('Please get some ETH in the node wallet before running ot-node'); - process.exit(1); - } else { - ( - log.info(`Initial balance of ETH: ${etherBalance}`) - ); - } - - const atracBalance = await Utilities.getAlphaTracTokenBalance( - web3, - selectedBlockchain.wallet_address, - selectedBlockchain.token_contract_address, - ); - if (atracBalance <= 0) { - console.log('Please get some ATRAC in the node wallet before running ot-node'); - process.exit(1); - } else { - ( - log.info(`Initial balance of ATRAC: ${atracBalance}`) - ); - } - } catch (error) { - console.log(error); - } + await this.getBalances(Utilities, selectedBlockchain, web3, config, true); + setInterval(async () => { + await this.getBalances(Utilities, selectedBlockchain, web3, config); + }, 300000); + } else { + config.enoughFunds = true; } // Create the container and set the injectionMode to PROXY (which is also the default).