diff --git a/config/config.json b/config/config.json index bc703ceaa3..20616b0895 100644 --- a/config/config.json +++ b/config/config.json @@ -125,6 +125,7 @@ "config": { "blockchainTitle": "ganache", "networkId": "ganache::testnet", + "identityFileName": "ganache", "hubContractAddress": "0x209679fA3B658Cd0fC74473aF28243bfe78a9b12", "rpcEndpoints": ["http://localhost:7545"], "evmManagementPublicKey": "0x1B420da5f7Be66567526E32bc68ab29F1A63765A" @@ -134,7 +135,8 @@ "package": "./blockchain/implementation/ot-parachain/ot-parachain-service.js", "config": { "networkId": "otp::testnet", - "hubContractAddress": "0x256736AEb3f19AC6738E9F4D10C9B61da71CEB9F", + "identityFileName": "otp", + "hubContractAddress": "0x707233a55bD035C6Bc732196CA4dbffa63CbA169", "rpcEndpoints": ["wss://lofar.origin-trail.network"] } }, @@ -142,6 +144,7 @@ "package": "./blockchain/implementation/polygon/polygon-service.js", "config": { "networkId": "polygon::testnet", + "identityFileName": "polygon", "hubContractAddress": "0xdaa16AC171CfE8Df6F79C06E7EEAb2249E2C9Ec8", "gasPriceOracleLink": "https://gasstation-mumbai.matic.today/v2", "rpcEndpoints": [ @@ -156,6 +159,7 @@ "package": "./blockchain/implementation/polygon/eth-service.js", "config": { "networkId": "eth::rinkeby", + "identityFileName": "rinkeby", "hubContractAddress": "", "gasPriceOracleLink": "", "rpcEndpoints": [] @@ -261,6 +265,7 @@ "package": "./blockchain/implementation/ganache/ganache-service.js", "config": { "blockchainTitle": "ganache", + "identityFileName": "ganache", "networkId": "ganache::testnet", "hubContractAddress": "0x209679fA3B658Cd0fC74473aF28243bfe78a9b12", "rpcEndpoints": ["http://localhost:7545"] @@ -409,7 +414,8 @@ "package": "./blockchain/implementation/ot-parachain/ot-parachain-service.js", "config": { "networkId": "parachain::testnet", - "hubContractAddress": "0x256736AEb3f19AC6738E9F4D10C9B61da71CEB9F", + "hubContractAddress": "0xc9184C1A0CE150a882DC3151Def25075bdAf069C", + "identityFileName": "otp", "rpcEndpoints": ["wss://lofar.origin-trail.network"] } }, @@ -417,6 +423,7 @@ "package": "./blockchain/implementation/polygon/polygon-service.js", "config": { "networkId": "polygon::testnet", + "identityFileName": "polygon", "hubContractAddress": "0xdaa16AC171CfE8Df6F79C06E7EEAb2249E2C9Ec8", "gasPriceOracleLink": "https://gasstation-mumbai.matic.today/v2", "rpcEndpoints": [ @@ -588,6 +595,7 @@ "package": "./blockchain/implementation/ot-parachain/ot-parachain-service.js", "config": { "networkId": "otp::mainnet", + "identityFileName": "otp", "hubContractAddress": "", "rpcEndpoints": [] } @@ -596,6 +604,7 @@ "package": "./blockchain/implementation/polygon/polygon-service.js", "config": { "networkId": "polygon::mainnet", + "identityFileName": "polygon", "hubContractAddress": "", "gasPriceOracleLink": "", "rpcEndpoints": [] diff --git a/ot-node.js b/ot-node.js index 3819489b01..d3b54fc285 100644 --- a/ot-node.js +++ b/ot-node.js @@ -1,17 +1,15 @@ import DeepExtend from 'deep-extend'; import rc from 'rc'; -import fs from 'fs'; -import appRootPath from 'app-root-path'; -import path from 'path'; import EventEmitter from 'events'; import { createRequire } from 'module'; import DependencyInjection from './src/service/dependency-injection.js'; import Logger from './src/logger/logger.js'; -import { MIN_NODE_VERSION } from './src/constants/constants.js'; +import { CONTRACTS, MIN_NODE_VERSION } from './src/constants/constants.js'; import FileService from './src/service/file-service.js'; import NetworkPrivateKeyMigration from './src/migration/network-private-key-migration.js'; import OtnodeUpdateCommand from './src/commands/common/otnode-update-command.js'; import OtAutoUpdater from './src/modules/auto-updater/implementation/ot-auto-updater.js'; +import BlockchainIdentityMigration from './src/migration/blockchain-identity-migration.js'; const require = createRequire(import.meta.url); const pjson = require('./package.json'); @@ -47,14 +45,14 @@ class OTNode { this.initializeEventEmitter(); await this.initializeModules(); - await this.saveNetworkModulePeerIdAndPrivKey(); await this.createProfiles(); await this.initializeCommandExecutor(); + await this.initializeShardingTableService(); await this.initializeTelemetryInjectionService(); await this.initializeRouters(); - + await this.startListeningOnBlockchainEvents(); this.logger.info('Node is up and running!'); } @@ -162,34 +160,26 @@ class OTNode { async createProfiles() { const blockchainModuleManager = this.container.resolve('blockchainModuleManager'); const createProfilesPromises = blockchainModuleManager - .getImplementationsNames() + .getImplementationNames() .map(async (blockchain) => { try { if (!blockchainModuleManager.identityExists(blockchain)) { this.logger.info(`Creating blockchain identity on network: ${blockchain}`); - const networkModuleManager = this.container.resolve('networkModuleManager'); - const peerId = networkModuleManager.getPeerId(); await blockchainModuleManager.deployIdentity(blockchain); + await blockchainModuleManager.saveIdentityInFile(); + } + const identity = blockchainModuleManager.getIdentity(blockchain); + this.logger.info(`${blockchain} blockchain identity is ${identity}`); + + if (!(await blockchainModuleManager.profileExists(blockchain, identity))) { this.logger.info(`Creating profile on network: ${blockchain}`); + const networkModuleManager = this.container.resolve('networkModuleManager'); + const peerId = networkModuleManager.getPeerId().toB58String(); await blockchainModuleManager.createProfile(blockchain, peerId); - if ( - process.env.NODE_ENV !== 'development' && - process.env.NODE_ENV !== 'test' - ) { - await this.saveIdentityInUserConfigurationFile( - blockchainModuleManager.getIdentity(blockchain), - blockchain, - ); - } } - this.logger.info( - `${blockchain} blockchain identity is ${blockchainModuleManager.getIdentity( - blockchain, - )}`, - ); } catch (error) { this.logger.warn( - `Unable to create ${blockchain} blockchain profile. Removing implementation.`, + `Unable to create ${blockchain} blockchain profile. Removing implementation. Error: ${error.message}`, ); blockchainModuleManager.removeImplementation(blockchain); } @@ -197,21 +187,12 @@ class OTNode { await Promise.all(createProfilesPromises); - if (!blockchainModuleManager.getImplementationsNames().length) { - this.logger.info(`Unable to create blockchain profiles. OT-node shutting down...`); + if (!blockchainModuleManager.getImplementationNames().length) { + this.logger.error(`Unable to create blockchain profiles. OT-node shutting down...`); this.stop(1); } } - async saveNetworkModulePeerIdAndPrivKey() { - const networkModuleManager = this.container.resolve('networkModuleManager'); - const privateKey = networkModuleManager.getPrivateKey(); - - if (process.env.NODE_ENV !== 'development' && process.env.NODE_ENV !== 'test') { - await this.savePrivateKeyAndPeerIdInUserConfigurationFile(privateKey); - } - } - async initializeCommandExecutor() { try { const commandExecutor = this.container.resolve('commandExecutor'); @@ -226,6 +207,85 @@ class OTNode { } } + async initializeShardingTableService() { + const blockchainModuleManager = this.container.resolve('blockchainModuleManager'); + const initShardingServices = blockchainModuleManager + .getImplementationNames() + .map(async (blockchain) => { + try { + const shardingTableService = this.container.resolve('shardingTableService'); + shardingTableService.initialize(blockchain); + this.logger.info( + `Sharding Table Service initialized successfully for '${blockchain}' blockchain`, + ); + } catch (e) { + this.logger.error( + `Sharding table service initialization for '${blockchain}' blockchain failed. + Error message: ${e.message}`, + ); + blockchainModuleManager.removeImplementation(blockchain); + } + }); + await Promise.all(initShardingServices); + + if (!blockchainModuleManager.getImplementationNames().length) { + this.logger.error( + `Unable to initialize sharding table service. OT-node shutting down...`, + ); + this.stop(1); + } + } + + async startListeningOnBlockchainEvents() { + this.logger.info('Starting blockchain event listener'); + const blockchainModuleManager = this.container.resolve('blockchainModuleManager'); + const repositoryModuleManager = this.container.resolve('repositoryModuleManager'); + const eventEmitter = this.container.resolve('eventEmitter'); + + const onEventsReceived = async (events) => { + if (events.length > 0) { + const insertedEvents = await repositoryModuleManager.insertBlockchainEvents(events); + insertedEvents.forEach((event) => { + if (event) { + const eventName = `${event.blockchain_id}-${event.event}`; + eventEmitter.emit(eventName, event); + } + }); + } + }; + + const getLastCheckedBlock = async (blockchainId, contract) => + repositoryModuleManager.getLastCheckedBlock(blockchainId, contract); + + const updateLastCheckedBlock = async (blockchainId, currentBlock, timestamp, contract) => + repositoryModuleManager.updateLastCheckedBlock( + blockchainId, + currentBlock, + timestamp, + contract, + ); + + let working = false; + + setInterval(async () => { + if (!working) { + try { + working = true; + await blockchainModuleManager.getAllPastEvents( + CONTRACTS.SHARDING_TABLE_CONTRACT, + onEventsReceived, + getLastCheckedBlock, + updateLastCheckedBlock, + ); + } catch (e) { + this.logger.error(`Failed to get blockchain events. Error: ${e}`); + } finally { + working = false; + } + } + }, 10 * 1000); + } + async initializeTelemetryInjectionService() { if (this.config.telemetry.enabled) { try { @@ -242,55 +302,6 @@ class OTNode { } } - async savePrivateKeyAndPeerIdInUserConfigurationFile(privateKey) { - const configurationFilePath = path.join(appRootPath.path, '..', this.config.configFilename); - const configFile = JSON.parse(await fs.promises.readFile(configurationFilePath)); - - if (!configFile.modules.network) { - configFile.modules.network = { - implementation: { - 'libp2p-service': { - config: {}, - }, - }, - }; - } else if (!configFile.modules.network.implementation) { - configFile.modules.network.implementation = { - 'libp2p-service': { - config: {}, - }, - }; - } else if (!configFile.modules.network.implementation['libp2p-service']) { - configFile.modules.network.implementation['libp2p-service'] = { - config: {}, - }; - } - if (!configFile.modules.network.implementation['libp2p-service'].config.privateKey) { - configFile.modules.network.implementation['libp2p-service'].config.privateKey = - privateKey; - await fs.promises.writeFile(configurationFilePath, JSON.stringify(configFile, null, 2)); - } - } - - async saveIdentityInUserConfigurationFile(identity, blockchain) { - const configurationFilePath = path.join(appRootPath.path, '..', this.config.configFilename); - const configFile = JSON.parse(await fs.promises.readFile(configurationFilePath)); - if ( - configFile.modules.blockchain && - configFile.modules.blockchain.implementation && - configFile.modules.blockchain.implementation[blockchain] && - configFile.modules.blockchain.implementation[blockchain].config - ) { - if (!configFile.modules.blockchain.implementation[blockchain].config.identity) { - configFile.modules.blockchain.implementation[blockchain].config.identity = identity; - await fs.promises.writeFile( - configurationFilePath, - JSON.stringify(configFile, null, 2), - ); - } - } - } - async removeUpdateFile() { const updateFilePath = this.fileService.getUpdateFilePath(); await this.fileService.removeFile(updateFilePath).catch((error) => { @@ -308,6 +319,15 @@ class OTNode { if (!(await networkPrivateKeyMigration.migrationAlreadyExecuted())) { await networkPrivateKeyMigration.migrate(); } + + const blockchainIdentityMigration = new BlockchainIdentityMigration( + 'BlockchainIdentityMigration', + this.logger, + this.config, + ); + if (!(await blockchainIdentityMigration.migrationAlreadyExecuted())) { + await blockchainIdentityMigration.migrate(); + } } async checkForUpdate() { diff --git a/package-lock.json b/package-lock.json index f4b30c3cc8..af467f4c23 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.0.0-beta.2.2.8", + "version": "6.0.0-beta.2.3.3", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.0.0-beta.2.2.8", + "version": "6.0.0-beta.2.3.3", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", @@ -19,7 +19,7 @@ "axios": "^0.27.2", "cors": "^2.8.5", "deep-extend": "^0.6.0", - "dkg-evm-module": "^1.0.3", + "dkg-evm-module": "^1.1.2", "dotenv": "^16.0.1", "express": "^4.18.1", "express-fileupload": "^1.4.0", @@ -46,7 +46,6 @@ "libp2p-tcp": "^0.17.2", "merkletreejs": "^0.2.32", "ms": "^2.1.3", - "multiaddr": "^10.0.0", "multiformats": "^9.8.1", "mysql2": "^2.3.3", "p-iteration": "^1.1.8", @@ -7822,9 +7821,9 @@ } }, "node_modules/dkg-evm-module": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/dkg-evm-module/-/dkg-evm-module-1.0.3.tgz", - "integrity": "sha512-HM/hCW6ZlAI7AwLCrQZSdbvMmHiU+CL9caRVh2qTjR1UJGrKLAD13CfLVe1k+ZbIZxPK/P/I2s3KWm1OfZ8cDA==" + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/dkg-evm-module/-/dkg-evm-module-1.1.2.tgz", + "integrity": "sha512-m8zsu4uFH5/BcGCDhEjG1rKBvc7AAPQRjfxP99YcIKI4doohLfI5JJdcpHqw7Rawe1/yo9A+NPIHptBGBjvoYg==" }, "node_modules/dkg.js": { "version": "6.0.0-beta.3.1.2", @@ -26249,9 +26248,9 @@ } }, "dkg-evm-module": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/dkg-evm-module/-/dkg-evm-module-1.0.3.tgz", - "integrity": "sha512-HM/hCW6ZlAI7AwLCrQZSdbvMmHiU+CL9caRVh2qTjR1UJGrKLAD13CfLVe1k+ZbIZxPK/P/I2s3KWm1OfZ8cDA==" + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/dkg-evm-module/-/dkg-evm-module-1.1.2.tgz", + "integrity": "sha512-m8zsu4uFH5/BcGCDhEjG1rKBvc7AAPQRjfxP99YcIKI4doohLfI5JJdcpHqw7Rawe1/yo9A+NPIHptBGBjvoYg==" }, "dkg.js": { "version": "6.0.0-beta.3.1.2", diff --git a/package.json b/package.json index 72ca20c889..339bdb8e01 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.0.0-beta.2.2.9", + "version": "6.0.0-beta.3.0.0", "description": "OTNode v6 Beta 2", "main": "index.js", "type": "module", @@ -71,7 +71,7 @@ "axios": "^0.27.2", "cors": "^2.8.5", "deep-extend": "^0.6.0", - "dkg-evm-module": "^1.0.3", + "dkg-evm-module": "^1.1.2", "dotenv": "^16.0.1", "express": "^4.18.1", "express-fileupload": "^1.4.0", @@ -98,7 +98,6 @@ "libp2p-tcp": "^0.17.2", "merkletreejs": "^0.2.32", "ms": "^2.1.3", - "multiaddr": "^10.0.0", "multiformats": "^9.8.1", "mysql2": "^2.3.3", "p-iteration": "^1.1.8", diff --git a/src/commands/common/dial-peers-command.js b/src/commands/common/dial-peers-command.js new file mode 100644 index 0000000000..e8f5bd91fe --- /dev/null +++ b/src/commands/common/dial-peers-command.js @@ -0,0 +1,60 @@ +import Command from '../command.js'; +import { + DIAL_PEERS_COMMAND_FREQUENCY_MILLS, + DIAL_PEERS_CONCURRENCY, +} from '../../constants/constants.js'; + +class DialPeersCommand extends Command { + constructor(ctx) { + super(ctx); + this.shardingTableService = ctx.shardingTableService; + this.repositoryModuleManager = ctx.repositoryModuleManager; + } + + /** + * Executes command and produces one or more events + * @param command + */ + async execute() { + const peersToDial = await this.repositoryModuleManager.getPeersToDial( + DIAL_PEERS_CONCURRENCY, + ); + + if (peersToDial.length) { + this.logger.trace(`Dialing ${peersToDial.length} remote peers`); + await Promise.all( + peersToDial.map(({ peer_id: peerId }) => this.shardingTableService.dial(peerId)), + ); + } + + return Command.repeat(); + } + + /** + * Recover system from failure + * @param command + * @param error + */ + async recover(command, error) { + this.logger.warn(`Failed to dial peers: error: ${error.message}`); + return Command.repeat(); + } + + /** + * Builds default command + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'dialPeersCommand', + data: {}, + period: DIAL_PEERS_COMMAND_FREQUENCY_MILLS, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +export default DialPeersCommand; diff --git a/src/commands/common/send-telemetry-command.js b/src/commands/common/send-telemetry-command.js index 2108ec510d..7562b8e132 100644 --- a/src/commands/common/send-telemetry-command.js +++ b/src/commands/common/send-telemetry-command.js @@ -30,7 +30,7 @@ class SendTelemetryCommand extends Command { const signalingMessage = { nodeData: { version: pjson.version, - identity: this.networkModuleManager.getPeerId()._idB58String, + identity: this.networkModuleManager.getPeerId().toB58String(), hostname: this.config.hostname, operational_wallet: this.blockchainModuleManager.getPublicKey(), management_wallet: this.blockchainModuleManager.getManagementKey(), diff --git a/src/commands/protocols/common/find-nodes-command.js b/src/commands/protocols/common/find-nodes-command.js index cf0ffc9d79..895a8aa564 100644 --- a/src/commands/protocols/common/find-nodes-command.js +++ b/src/commands/protocols/common/find-nodes-command.js @@ -5,6 +5,7 @@ class FindNodesCommand extends Command { constructor(ctx) { super(ctx); this.networkModuleManager = ctx.networkModuleManager; + this.shardingTableService = ctx.shardingTableService; } /** @@ -12,23 +13,34 @@ class FindNodesCommand extends Command { * @param command */ async execute(command) { - const { keyword, operationId, minimumAckResponses, errorType, networkProtocols } = - command.data; + const { + keyword, + operationId, + blockchain, + minimumAckResponses, + errorType, + networkProtocols, + } = command.data; this.errorType = errorType; this.logger.debug(`Searching for closest node(s) for keyword ${keyword}`); + // TODO: protocol selection const closestNodes = []; - for (const node of await this.findNodes(keyword, operationId)) { - for (const protocol of networkProtocols) { - if (node.protocols.includes(protocol)) { - closestNodes.push({ id: node.id, protocol }); - break; - } + for (const node of await this.findNodes(keyword, operationId, blockchain)) { + if (node.id !== this.networkModuleManager.getPeerId().toB58String()) { + closestNodes.push({ id: node.id, protocol: networkProtocols[0] }); } } this.logger.debug(`Found ${closestNodes.length} node(s) for keyword ${keyword}`); + this.logger.trace( + `Found neighbourhood: ${JSON.stringify( + closestNodes.map((node) => node.id), + null, + 2, + )}`, + ); const batchSize = 2 * minimumAckResponses; if (closestNodes.length < batchSize) { @@ -46,25 +58,37 @@ class FindNodesCommand extends Command { ...command.data, batchSize, leftoverNodes: closestNodes, + numberOfFoundNodes: closestNodes.length, }, command.sequence, ); } - async findNodes(keyword, operationId) { + async findNodes(keyword, operationId, blockchainId) { await this.operationIdService.updateOperationIdStatus( operationId, OPERATION_ID_STATUS.FIND_NODES_START, ); - const closestNodes = await this.networkModuleManager.findNodes(keyword); + // todo r2 hardcoded to 20, + const closestNodes = await this.shardingTableService.findNeighbourhood( + keyword, + blockchainId, + 20, + ); + + const nodesFound = await Promise.all( + closestNodes.map((node) => + this.shardingTableService.findPeerAddressAndProtocols(node.peer_id), + ), + ); await this.operationIdService.updateOperationIdStatus( operationId, OPERATION_ID_STATUS.FIND_NODES_END, ); - return closestNodes; + return nodesFound; } /** diff --git a/src/commands/protocols/common/find-nodes-local-command.js b/src/commands/protocols/common/find-nodes-local-command.js deleted file mode 100644 index 6522ee761a..0000000000 --- a/src/commands/protocols/common/find-nodes-local-command.js +++ /dev/null @@ -1,37 +0,0 @@ -import FindNodesCommand from './find-nodes-command.js'; -import { OPERATION_ID_STATUS } from '../../../constants/constants.js'; - -class FindNodesLocalCommand extends FindNodesCommand { - async findNodes(keyword, operationId) { - await this.operationIdService.updateOperationIdStatus( - operationId, - OPERATION_ID_STATUS.FIND_NODES_LOCAL_START, - ); - - const closestNodes = await this.networkModuleManager.findNodesLocal(keyword); - - await this.operationIdService.updateOperationIdStatus( - operationId, - OPERATION_ID_STATUS.FIND_NODES_LOCAL_END, - ); - - return closestNodes; - } - - /** - * Builds default findNodesLocalCommand - * @param map - * @returns {{add, data: *, delay: *, deadline: *}} - */ - default(map) { - const command = { - name: 'findNodesLocalCommand', - delay: 0, - transactional: false, - }; - Object.assign(command, map); - return command; - } -} - -export default FindNodesLocalCommand; diff --git a/src/commands/protocols/common/network-protocol-command.js b/src/commands/protocols/common/network-protocol-command.js index dbf4dca996..d2497d14ff 100644 --- a/src/commands/protocols/common/network-protocol-command.js +++ b/src/commands/protocols/common/network-protocol-command.js @@ -13,7 +13,7 @@ class NetworkProtocolCommand extends Command { async execute(command) { const keywords = this.getKeywords(command); const commandSequence = [ - this.getFindNodesCommand(), + 'findNodesCommand', `${this.operationService.getOperationName()}ScheduleMessagesCommand`, ]; @@ -38,19 +38,11 @@ class NetworkProtocolCommand extends Command { return Command.empty(); } - getFindNodesCommand() { - return 'findNodesCommand'; - } - getKeywords() { // overridden by subclasses return []; } - getNextCommandData() { - // overridden by subclasses - } - /** * Builds default protocolNetworkCommand * @param map diff --git a/src/commands/protocols/common/protocol-schedule-messages-command.js b/src/commands/protocols/common/protocol-schedule-messages-command.js index f06e5d06ae..933326d6fc 100644 --- a/src/commands/protocols/common/protocol-schedule-messages-command.js +++ b/src/commands/protocols/common/protocol-schedule-messages-command.js @@ -12,12 +12,12 @@ class ProtocolScheduleMessagesCommand extends Command { * @param command */ async execute(command) { - const { operationId, keyword, batchSize, leftoverNodes, nodesSeen = [] } = command.data; + const { operationId, keyword, batchSize, leftoverNodes, numberOfFoundNodes, blockchain } = + command.data; const currentBatchNodes = leftoverNodes.slice(0, batchSize); const currentBatchLeftoverNodes = batchSize < leftoverNodes.length ? leftoverNodes.slice(batchSize) : []; - currentBatchNodes.forEach((node) => nodesSeen.push(node.id._idB58String)); await this.operationIdService.updateOperationIdStatus(operationId, this.startEvent); @@ -37,13 +37,13 @@ class ProtocolScheduleMessagesCommand extends Command { delay: 0, data: { ...this.getNextCommandData(command), + blockchain, operationId, keyword, node, - numberOfFoundNodes: currentBatchLeftoverNodes.length + nodesSeen.length, + numberOfFoundNodes, batchSize, leftoverNodes: currentBatchLeftoverNodes, - nodesSeen, }, period: 5000, retries: 3, diff --git a/src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-get-init-command.js b/src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-get-init-command.js index 7a49c811ef..a8cf26cb3b 100644 --- a/src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-get-init-command.js +++ b/src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-get-init-command.js @@ -16,7 +16,7 @@ class HandleGetInitCommand extends HandleProtocolMessageCommand { } async prepareMessage(commandData) { - const { assertionId, operationId, networkProtocol } = commandData; + const { assertionId, operationId } = commandData; await this.operationIdService.updateOperationIdStatus( operationId, OPERATION_ID_STATUS.GET.ASSERTION_EXISTS_LOCAL_START, @@ -24,29 +24,17 @@ class HandleGetInitCommand extends HandleProtocolMessageCommand { const assertionExists = await this.tripleStoreModuleManager.assertionExists(assertionId); - const message = {}; - - if (assertionExists) { - message.messageType = NETWORK_MESSAGE_TYPES.RESPONSES.ACK; - message.messageData = {}; - } else { - const peers = await this.networkModuleManager.findNodesLocal( - assertionId, - networkProtocol, - ); - - message.messageType = NETWORK_MESSAGE_TYPES.RESPONSES.NACK; - message.messageData = { - nodes: await this.networkModuleManager.serializePeers(peers), - }; - } - await this.operationIdService.updateOperationIdStatus( operationId, OPERATION_ID_STATUS.GET.ASSERTION_EXISTS_LOCAL_END, ); - return message; + return { + messageType: assertionExists + ? NETWORK_MESSAGE_TYPES.RESPONSES.ACK + : NETWORK_MESSAGE_TYPES.RESPONSES.NACK, + messageData: {}, + }; } /** diff --git a/src/commands/protocols/get/sender/get-latest-assertion-id-command.js b/src/commands/protocols/get/sender/get-latest-assertion-id-command.js index 805aa692f4..387b574f68 100644 --- a/src/commands/protocols/get/sender/get-latest-assertion-id-command.js +++ b/src/commands/protocols/get/sender/get-latest-assertion-id-command.js @@ -60,7 +60,28 @@ class GetLatestAssertionIdCommand extends Command { } } - return this.continueSequence({ ...command.data, assertionId }, command.sequence); + // TODO: move this after local get + let blockchain; + try { + blockchain = await Promise.any( + this.blockchainModuleManager + .getImplementationNames() + .map((blockchainId) => + this.blockchainModuleManager + .getAssertionIssuer(blockchain, assertionId) + .then(() => blockchainId), + ), + ); + } catch (error) { + this.logger.warn( + `Assertion id ${assertionId} not found on any of the supported blockchains.`, + ); + } + + return this.continueSequence( + { ...command.data, assertionId, blockchain }, + command.sequence, + ); } /** diff --git a/src/commands/protocols/get/sender/get-schedule-messages-command.js b/src/commands/protocols/get/sender/get-schedule-messages-command.js index 2f4d87c2aa..cc3088f2fa 100644 --- a/src/commands/protocols/get/sender/get-schedule-messages-command.js +++ b/src/commands/protocols/get/sender/get-schedule-messages-command.js @@ -11,11 +11,9 @@ class GetScheduleMessagesCommand extends ProtocolScheduleMessagesCommand { } getNextCommandData(command) { - const { ual, assertionId } = command.data; return { - ual, - assertionId, - newFoundNodes: {}, + ual: command.data.ual, + assertionId: command.data.assertionId, }; } diff --git a/src/commands/protocols/get/sender/network-get-command.js b/src/commands/protocols/get/sender/network-get-command.js index e5dec5d3a4..1c4f3de22a 100644 --- a/src/commands/protocols/get/sender/network-get-command.js +++ b/src/commands/protocols/get/sender/network-get-command.js @@ -14,17 +14,6 @@ class NetworkGetCommand extends NetworkProtocolCommand { return [assertionId]; } - getNextCommandData(command) { - const { assertionId } = command.data; - return { - assertionId, - }; - } - - getFindNodesCommand() { - return 'findNodesLocalCommand'; - } - /** * Builds default networkGetCommand * @param map diff --git a/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-init-command.js b/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-init-command.js index e0da515483..ae6933c8cd 100644 --- a/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-init-command.js +++ b/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-init-command.js @@ -11,9 +11,7 @@ class GetInitCommand extends ProtocolInitCommand { } async prepareMessage(command) { - const { assertionId } = command.data; - - return { assertionId }; + return { assertionId: command.data.assertionId }; } async handleNack(command, responseData) { diff --git a/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-request-command.js b/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-request-command.js index 5ae1fa42b0..e196bfe9ed 100644 --- a/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-request-command.js +++ b/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-request-command.js @@ -10,9 +10,7 @@ class GetRequestCommand extends ProtocolRequestCommand { } async prepareMessage(command) { - const { assertionId } = command.data; - - return { assertionId }; + return { assertionId: command.data.assertionId }; } /** diff --git a/src/commands/protocols/publish/sender/network-publish-command.js b/src/commands/protocols/publish/sender/network-publish-command.js index 8e9a9787e5..5b163e51d8 100644 --- a/src/commands/protocols/publish/sender/network-publish-command.js +++ b/src/commands/protocols/publish/sender/network-publish-command.js @@ -17,15 +17,6 @@ class NetworkPublishCommand extends NetworkProtocolCommand { return [command.data.assertionId]; } - getNextCommandData(command) { - const { publishType, assertionId, blockchain, contract } = command.data; - const assertionCommandData = { publishType, assertionId, blockchain, contract }; - - if (publishType === PUBLISH_TYPES.ASSERTION) return assertionCommandData; - - return { ...assertionCommandData, tokenId: command.data.tokenId }; - } - /** * Builds default networkPublishCommand * @param map diff --git a/src/commands/protocols/publish/sender/validate-assertion-command.js b/src/commands/protocols/publish/sender/validate-assertion-command.js index 444de0f245..72cf8f1b71 100644 --- a/src/commands/protocols/publish/sender/validate-assertion-command.js +++ b/src/commands/protocols/publish/sender/validate-assertion-command.js @@ -27,17 +27,12 @@ class ValidateAssertionCommand extends Command { const ual = this.ualService.deriveUAL(blockchain, contract, tokenId); this.logger.info(`Validating assertion with ual: ${ual}`); - let blockchainAssertionId; - try { - blockchainAssertionId = await this.operationService.getAssertion( - blockchain, - contract, - tokenId, - ); - } catch (error) { - this.logger.warn( - `Unable to validate blockchain data for ual: ${ual}. Received error: ${error.message}, retrying.`, - ); + const blockchainAssertionId = await this.operationService.getAssertion( + blockchain, + contract, + tokenId, + ); + if (!blockchainAssertionId) { return Command.retry(); } if (blockchainAssertionId !== assertionId) { diff --git a/src/constants/constants.js b/src/constants/constants.js index 407012e5a3..635f4199a6 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -4,8 +4,14 @@ export const LIBP2P_KEY_DIRECTORY = 'libp2p'; export const LIBP2P_KEY_FILENAME = 'privateKey'; +export const BLOCKCHAIN_IDENTITY_DIRECTORY = 'blockchain'; + export const TRIPLE_STORE_CONNECT_MAX_RETRIES = 10; +export const DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS = 15 * 24 * 60 * 60 * 1000; + +export const MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH = 500; + export const TRIPLE_STORE_CONNECT_RETRY_FREQUENCY = 10; export const MAX_FILE_SIZE = 2621440; @@ -14,6 +20,8 @@ export const PUBLISH_TYPES = { ASSERTION: 'assertion', ASSET: 'asset', INDEX: 'i export const DHT_TYPES = { DUAL: 'dual', WAN: 'wan', LAN: 'lan' }; +export const PEER_OFFLINE_LIMIT = 24 * 60 * 60 * 1000; + /** * Triple store media types * @type {{APPLICATION_JSON: string, N_QUADS: string, SPARQL_RESULTS_JSON: string, LD_JSON: string}} @@ -38,7 +46,9 @@ export const XML_DATA_TYPES = { export const MIN_NODE_VERSION = 16; -export const INIT_STAKE_AMOUNT = 3000; +export const INIT_ASK_AMOUNT = 5; // TODO: Change value + +export const INIT_STAKE_AMOUNT = '50000'; export const NETWORK_API_RATE_LIMIT = { TIME_WINDOW_MILLS: 1 * 60 * 1000, @@ -62,11 +72,16 @@ export const REMOVE_SESSION_COMMAND_DELAY = 2 * 60 * 1000; export const OPERATION_IDS_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000; +export const DIAL_PEERS_COMMAND_FREQUENCY_MILLS = 30 * 1000; + +export const DIAL_PEERS_CONCURRENCY = 10; + export const PERMANENT_COMMANDS = [ 'otnodeUpdateCommand', 'sendTelemetryCommand', 'operationIdCleanerCommand', 'commandsCleanerCommand', + 'dialPeersCommand', ]; export const MAX_COMMAND_DELAY_IN_MILLS = 14400 * 60 * 1000; // 10 days @@ -262,3 +277,11 @@ export const QUERY_TYPES = { SELECT: 'SELECT', CONSTRUCT: 'CONSTRUCT', }; + +/** + * Contract names + * @type {{SHARDING_TABLE_CONTRACT: string}} + */ +export const CONTRACTS = { + SHARDING_TABLE_CONTRACT: 'ShardingTableContract', +}; diff --git a/src/migration/blockchain-identity-migration.js b/src/migration/blockchain-identity-migration.js new file mode 100644 index 0000000000..be6fd764e5 --- /dev/null +++ b/src/migration/blockchain-identity-migration.js @@ -0,0 +1,27 @@ +import path from 'path'; +import appRootPath from 'app-root-path'; +import BaseMigration from './base-migration.js'; + +class BlockchainIdentityMigration extends BaseMigration { + async executeMigration() { + if (process.env.NODE_ENV !== 'development' && process.env.NODE_ENV !== 'test') { + const configurationFolderPath = path.join(appRootPath.path, '..'); + const configurationFilePath = path.join( + configurationFolderPath, + this.config.configFilename, + ); + + const config = await this.fileService.loadJsonFromFile(configurationFilePath); + for (const blockchainImpl in config.modules.blockchain.implementation) { + delete config.modules.blockchain.implementation[blockchainImpl].config.identity; + } + await this.fileService.writeContentsToFile( + configurationFolderPath, + this.config.configFilename, + JSON.stringify(config, null, 4), + ); + } + } +} + +export default BlockchainIdentityMigration; diff --git a/src/modules/base-module-manager.js b/src/modules/base-module-manager.js index a6d421b209..cf79166b68 100644 --- a/src/modules/base-module-manager.js +++ b/src/modules/base-module-manager.js @@ -92,7 +92,7 @@ class BaseModuleManager { return this.handlers[name]; } - getImplementationsNames() { + getImplementationNames() { return Object.keys(this.handlers); } diff --git a/src/modules/blockchain/blockchain-module-manager.js b/src/modules/blockchain/blockchain-module-manager.js index 5e9eb15e3e..381688d04b 100644 --- a/src/modules/blockchain/blockchain-module-manager.js +++ b/src/modules/blockchain/blockchain-module-manager.js @@ -47,6 +47,18 @@ class BlockchainModuleManager extends BaseModuleManager { } } + async profileExists(blockchain, identity) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.profileExists(identity); + } + } + + async saveIdentityInFile(blockchain) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.saveIdentityInFile(); + } + } + async getEpochs(blockchain, UAI) { if (this.getImplementation(blockchain)) { return this.getImplementation(blockchain).module.getEpochs(UAI); @@ -106,6 +118,96 @@ class BlockchainModuleManager extends BaseModuleManager { return this.getImplementation(blockchain).module.restartService(); } } + + async getAssertionIssuer(blockchain, assertionId) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.getAssertionIssuer(assertionId); + } + } + + async getShardingTableHead(blockchain) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.getShardingTableHead(); + } + } + + async getShardingTableLength(blockchain) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.getShardingTableLength(); + } + } + + async getShardingTablePage(blockchain, startingPeerId, nodesNum) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.getShardingTablePage( + startingPeerId, + nodesNum, + ); + } + } + + async getShardingTableFull(blockchain) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.getShardingTableFull(); + } + } + + async pushPeerBack(blockchain, peerId, ask, stake) { + if (this.getImplementation(blockchain)) { + await this.getImplementation(blockchain).module.pushPeerBack(peerId, ask, stake); + } + } + + async pushPeerFront(blockchain, peerId, ask, stake) { + if (this.getImplementation(blockchain)) { + await this.getImplementation(blockchain).module.pushPeerFront(peerId, ask, stake); + } + } + + async updatePeerParams(blockchain, peerId, ask, stake) { + if (this.getImplementation(blockchain)) { + await this.getImplementation(blockchain).module.updatePeerParams(peerId, ask, stake); + } + } + + async removePeer(blockchain, peerId) { + if (this.getImplementation(blockchain)) { + await this.getImplementation(blockchain).module.removePeer(peerId); + } + } + + async getAllPastEvents( + contractName, + onEventsReceived, + getLastCheckedBlock, + updateLastCheckedBlock, + ) { + const blockchainIds = this.getImplementationNames(); + const getEventsPromises = []; + for (const blockchainId of blockchainIds) { + getEventsPromises.push( + this.getImplementation(blockchainId).module.getAllPastEvents( + contractName, + onEventsReceived, + getLastCheckedBlock, + updateLastCheckedBlock, + ), + ); + } + return Promise.all(getEventsPromises); + } + + convertAsciiToHex(blockchain, peerId) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.convertAsciiToHex(peerId); + } + } + + convertHexToAscii(blockchain, peerIdHex) { + if (this.getImplementation(blockchain)) { + return this.getImplementation(blockchain).module.convertHexToAscii(peerIdHex); + } + } } export default BlockchainModuleManager; diff --git a/src/modules/blockchain/implementation/eth/eth-service.js b/src/modules/blockchain/implementation/eth/eth-service.js index f39716acc2..e8789bccb3 100644 --- a/src/modules/blockchain/implementation/eth/eth-service.js +++ b/src/modules/blockchain/implementation/eth/eth-service.js @@ -7,6 +7,10 @@ class EthService extends Web3Service { this.baseTokenTicker = 'ETH'; this.tracTicker = 'TRAC'; } + + getBlockchainId() { + return 'eth'; + } } export default EthService; diff --git a/src/modules/blockchain/implementation/ganache/ganache-service.js b/src/modules/blockchain/implementation/ganache/ganache-service.js index 3762d6c407..48219798db 100644 --- a/src/modules/blockchain/implementation/ganache/ganache-service.js +++ b/src/modules/blockchain/implementation/ganache/ganache-service.js @@ -7,6 +7,10 @@ class GanacheService extends Web3Service { this.baseTokenTicker = 'GANACHE_TOKENS'; this.tracTicker = 'gTRAC'; } + + getBlockchainId() { + return 'ganache'; + } } export default GanacheService; diff --git a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js index 612af2511c..be87ce6f32 100644 --- a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js +++ b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js @@ -15,10 +15,9 @@ class OtParachainService extends Web3Service { this.config = config; this.logger = logger; this.rpcNumber = 0; - - await Promise.all([this.initializeWeb3(), this.initializeParachainProvider()]); + await this.initializeParachainProvider(); await this.checkEvmAccountsMapping(); - await this.initializeContracts(); + await super.initialize(config, logger); } async checkEvmAccountsMapping() { @@ -101,7 +100,7 @@ class OtParachainService extends Web3Service { if (this.config.gasPriceOracleLink) return super.getGasPrice(); try { - return (await this.web3.eth.getGasPrice()) * 1000; + return (await this.web3.eth.getGasPrice()) * 1000000; } catch (error) { return undefined; } @@ -137,6 +136,10 @@ class OtParachainService extends Web3Service { const nativeBalance = await this.web3.eth.getBalance(this.getPublicKey()); return nativeBalance / 10 ** NATIVE_TOKEN_DECIMALS; } + + getBlockchainId() { + return 'otp'; + } } export default OtParachainService; diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index a32cf968dd..732ae0e99d 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -1,16 +1,27 @@ import Web3 from 'web3'; import axios from 'axios'; -import { peerId2Hash } from 'assertion-tools'; import { createRequire } from 'module'; -import { INIT_STAKE_AMOUNT, WEBSOCKET_PROVIDER_OPTIONS } from '../../../constants/constants.js'; +import { join } from 'path'; +import appRootPath from 'app-root-path'; +import { mkdir, readFile, stat, writeFile } from 'fs/promises'; +import { + INIT_ASK_AMOUNT, + INIT_STAKE_AMOUNT, + BLOCKCHAIN_IDENTITY_DIRECTORY, + WEBSOCKET_PROVIDER_OPTIONS, + DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS, + MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH, +} from '../../../constants/constants.js'; const require = createRequire(import.meta.url); const Hub = require('dkg-evm-module/build/contracts/Hub.json'); +const AssertionRegistry = require('dkg-evm-module/build/contracts/AssertionRegistry.json'); const AssetRegistry = require('dkg-evm-module/build/contracts/AssetRegistry.json'); const ERC20Token = require('dkg-evm-module/build/contracts/ERC20Token.json'); const Identity = require('dkg-evm-module/build/contracts/Identity.json'); const Profile = require('dkg-evm-module/build/contracts/Profile.json'); const ProfileStorage = require('dkg-evm-module/build/contracts/ProfileStorage.json'); +const ShardingTable = require('dkg-evm-module/build/contracts/ShardingTable.json'); class Web3Service { async initialize(config, logger) { @@ -18,7 +29,9 @@ class Web3Service { this.logger = logger; this.rpcNumber = 0; + await this.readIdentity(); await this.initializeWeb3(); + this.currentBlock = await this.web3.eth.getBlockNumber(); await this.initializeContracts(); } @@ -59,6 +72,26 @@ class Web3Service { this.logger.info(`Hub contract address is ${this.config.hubContractAddress}`); this.hubContract = new this.web3.eth.Contract(Hub.abi, this.config.hubContractAddress); + const shardingTableAddress = await this.callContractFunction( + this.hubContract, + 'getContractAddress', + ['ShardingTable'], + ); + this.ShardingTableContract = new this.web3.eth.Contract( + ShardingTable.abi, + shardingTableAddress, + ); + + const assertionRegistryAddress = await this.callContractFunction( + this.hubContract, + 'getContractAddress', + ['AssertionRegistry'], + ); + this.AssertionRegistryContract = new this.web3.eth.Contract( + AssertionRegistry.abi, + assertionRegistryAddress, + ); + const assetRegistryAddress = await this.callContractFunction( this.hubContract, 'getContractAddress', @@ -104,6 +137,56 @@ class Web3Service { await this.logBalances(); } + async readIdentity() { + this.config.identity = await this.getIdentityFromFile(); + } + + getKeyPath() { + let directoryPath; + if (process.env.NODE_ENV === 'testnet' || process.env.NODE_ENV === 'mainnet') { + directoryPath = join( + appRootPath.path, + '..', + this.config.appDataPath, + BLOCKCHAIN_IDENTITY_DIRECTORY, + ); + } else { + directoryPath = join( + appRootPath.path, + this.config.appDataPath, + BLOCKCHAIN_IDENTITY_DIRECTORY, + ); + } + + const fullPath = join(directoryPath, this.config.identityFileName); + return { fullPath, directoryPath }; + } + + async getIdentityFromFile() { + const keyPath = this.getKeyPath(); + if (await this.fileExists(keyPath.fullPath)) { + const key = (await readFile(keyPath.fullPath)).toString(); + return key; + } + } + + async fileExists(filePath) { + try { + await stat(filePath); + return true; + } catch (e) { + return false; + } + } + + async saveIdentityInFile() { + if (process.env.NODE_ENV !== 'development' && process.env.NODE_ENV !== 'test') { + const { fullPath, directoryPath } = this.getKeyPath(); + await mkdir(directoryPath, { recursive: true }); + await writeFile(fullPath, this.config.identity); + } + } + async logBalances() { const nativeBalance = await this.getNativeTokenBalance(); const tokenBalance = await this.getTokenBalance(); @@ -144,25 +227,36 @@ class Web3Service { } async deployIdentity() { - const transactionReceipt = await this.deployContract(Identity, [ - this.getPublicKey(), - this.getManagementKey(), + if (!this.config.identity) { + const transactionReceipt = await this.deployContract(Identity, [ + this.getPublicKey(), + this.getManagementKey(), + ]); + this.config.identity = transactionReceipt.contractAddress; + } else { + this.logger.info(`Using existing identity: ${this.config.identity}`); + } + } + + async profileExists(identity) { + const nodeId = await this.callContractFunction(this.ProfileStorageContract, 'getNodeId', [ + identity, ]); - this.config.identity = transactionReceipt.contractAddress; + return nodeId != null; } async createProfile(peerId) { + const stakeAmount = Web3.utils.toWei(INIT_STAKE_AMOUNT, 'ether'); await this.executeContractFunction(this.TokenContract, 'increaseAllowance', [ this.ProfileContract.options.address, - INIT_STAKE_AMOUNT, + stakeAmount, ]); - const nodeId = await peerId2Hash(peerId); - await this.executeContractFunction(this.ProfileContract, 'createProfile', [ this.getManagementKey(), - nodeId, - INIT_STAKE_AMOUNT, + this.convertAsciiToHex(peerId), + INIT_ASK_AMOUNT, + stakeAmount, this.getIdentity(), ]); } @@ -222,7 +316,7 @@ class Web3Service { async callContractFunction(contractInstance, functionName, args) { let result; - while (!result) { + while (result === undefined) { try { // eslint-disable-next-line no-await-in-loop result = await contractInstance.methods[functionName](...args).call(); @@ -237,7 +331,7 @@ class Web3Service { async executeContractFunction(contractInstance, functionName, args) { let result; - while (!result) { + while (result === undefined) { try { /* eslint-disable no-await-in-loop */ const gasPrice = await this.getGasPrice(); @@ -270,6 +364,75 @@ class Web3Service { return result; } + async getAllPastEvents( + contractName, + onEventsReceived, + getLastCheckedBlock, + updateLastCheckedBlock, + ) { + const contract = this[contractName]; + if (!contract) { + throw Error(`Error while getting all past events. Unknown contract: ${contractName}`); + } + + const blockchainId = this.getBlockchainId(); + const lastCheckedBlockObject = await getLastCheckedBlock(blockchainId, contractName); + + let fromBlock; + + if ( + this.isOlderThan( + lastCheckedBlockObject?.last_checked_timestamp, + DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS, + ) + ) { + fromBlock = this.currentBlock - 10; + } else { + this.currentBlock = await this.getBlockNumber(); + fromBlock = lastCheckedBlockObject.last_checked_block + 1; + } + + let events = []; + if (this.currentBlock - fromBlock > MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH) { + let iteration = 1; + + while (fromBlock - MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH > this.currentBlock) { + events.concat( + await contract.getPastEvents('allEvents', { + fromBlock, + toBlock: fromBlock + MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH * iteration, + }), + ); + fromBlock += MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH * iteration; + iteration += 1; + } + } else { + events = await contract.getPastEvents('allEvents', { + fromBlock, + toBlock: this.currentBlock, + }); + } + + await updateLastCheckedBlock(blockchainId, this.currentBlock, Date.now(), contractName); + if (events.length > 0) { + await onEventsReceived( + events.map((event) => ({ + contract: contractName, + event: event.event, + data: JSON.stringify(event.returnValues), + block: event.blockNumber, + blockchainId, + })), + ); + } + } + + isOlderThan(timestamp, olderThanInMills) { + if (!timestamp) return true; + const timestampThirtyDaysInPast = new Date().getTime() - olderThanInMills; + return timestamp < timestampThirtyDaysInPast; + } + async deployContract(contract, args) { let result; while (!result) { @@ -296,8 +459,8 @@ class Web3Service { const tx = { from: this.getPublicKey(), data: encodedABI, - gasPrice: gasPrice || this.web3.utils.toWei('20', 'Gwei'), - gas: gasLimit || this.web3.utils.toWei('900', 'Kwei'), + gasPrice: gasPrice ?? this.web3.utils.toWei('20', 'Gwei'), + gas: gasLimit ?? this.web3.utils.toWei('900', 'Kwei'), }; const createdTransaction = await this.web3.eth.accounts.signTransaction( @@ -363,6 +526,115 @@ class Web3Service { await this.initializeWeb3(); await this.initializeContracts(); } + + async getAssertionIssuer(assertionId) { + return this.callContractFunction(this.AssertionRegistryContract, 'getIssuer', [ + assertionId, + ]); + } + + async getShardingTableHead() { + try { + return await this.callContractFunction(this.ShardingTableContract, 'head', []); + } catch (e) { + this.logger.error(`Error on calling contract function. ${e}`); + return false; + } + } + + async getShardingTableLength() { + try { + return await this.callContractFunction(this.ShardingTableContract, 'nodesCount', []); + } catch (e) { + this.logger.error(`Error on calling contract function. ${e}`); + return false; + } + } + + async getShardingTablePage(startingPeerId, nodesNum) { + try { + return await this.callContractFunction(this.ShardingTableContract, 'getShardingTable', [ + startingPeerId, + nodesNum, + ]); + } catch (e) { + this.logger.error(`Error on calling contract function. ${e}`); + return false; + } + } + + async getShardingTableFull() { + try { + return await this.callContractFunction( + this.ShardingTableContract, + 'getShardingTable', + [], + ); + } catch (e) { + this.logger.error(`Error on calling contract function. ${e}`); + return false; + } + } + + async pushPeerBack(peerId, ask, stake) { + try { + return this.executeContractFunction(this.ShardingTableContract, 'pushBack', [ + peerId, + ask, + stake, + ]); + } catch (e) { + this.logger.error(`Error on executing contract function. ${e}`); + return false; + } + } + + async pushPeerFront(peerId, ask, stake) { + try { + return this.executeContractFunction(this.ShardingTableContract, 'pushFront', [ + peerId, + ask, + stake, + ]); + } catch (e) { + this.logger.error(`Error on executing contract function. ${e}`); + return false; + } + } + + async updatePeerParams(peerId, ask, stake) { + try { + return this.executeContractFunction(this.ShardingTableContract, 'updateParams', [ + peerId, + ask, + stake, + ]); + } catch (e) { + this.logger.error(`Error on executing contract function. ${e}`); + return false; + } + } + + async removePeer(peerId) { + try { + return this.executeContractFunction(this.ShardingTableContract, 'removePeer', [peerId]); + } catch (e) { + this.logger.error(`Error on executing contract function. ${e}`); + return false; + } + } + + getBlockchainId() { + throw Error('Get blockchain id not implemented'); + } + + convertAsciiToHex(peerId) { + return Web3.utils.asciiToHex(peerId); + } + + convertHexToAscii(peerIdHex) { + return Web3.utils.hexToAscii(peerIdHex); + } } export default Web3Service; diff --git a/src/modules/network/implementation/libp2p-service.js b/src/modules/network/implementation/libp2p-service.js index 281dc62779..2c34896860 100644 --- a/src/modules/network/implementation/libp2p-service.js +++ b/src/modules/network/implementation/libp2p-service.js @@ -7,7 +7,6 @@ import { NOISE } from 'libp2p-noise'; import MPLEX from 'libp2p-mplex'; import TCP from 'libp2p-tcp'; import pipe from 'it-pipe'; -import { Multiaddr } from 'multiaddr'; import { encode, decode } from 'it-length-prefixed'; import { sha256 } from 'multiformats/hashes/sha2'; import map from 'it-map'; @@ -77,9 +76,7 @@ class Libp2pService { if (!this.config.privateKey) { id = await _create({ bits: 1024, keyType: 'RSA' }); this.config.privateKey = id.toJSON().privKey; - if (process.env.NODE_ENV === 'development') { - await this.savePrivateKeyInFile(this.config.privateKey); - } + await this.savePrivateKeyInFile(this.config.privateKey); } else { id = await createFromPrivKey(this.config.privateKey); } @@ -103,11 +100,15 @@ class Libp2pService { this.node = await libp2p.create(initializationObject); await this.node.start(); const port = parseInt(this.node.multiaddrs.toString().split('/')[4], 10); - const peerId = this.node.peerId._idB58String; + const peerId = this.node.peerId.toB58String(); this.config.id = peerId; this.logger.info(`Network ID is ${peerId}, connection port is ${port}`); } + async onPeerConnected(listener) { + this.node.connectionManager.on('peer:connect', listener); + } + async savePrivateKeyInFile(privateKey) { const { fullPath, directoryPath } = this.getKeyPath(); await mkdir(directoryPath, { recursive: true }); @@ -179,44 +180,15 @@ class Libp2pService { return this.node.peerStore.addressBook.get(peerId); } - serializePeer(peer) { - return { - id: peer.id._idB58String, - multiaddrs: (peer.multiaddrs ?? []).map((addr) => addr.multiaddr), - protocols: peer.protocols ?? [], - }; - } - - serializePeers(peers) { - return peers.map((peer) => this.serializePeer(peer)); - } - - deserializePeer(serializedPeer) { - const peerId = createFromB58String(serializedPeer.id); - const multiaddrs = serializedPeer.multiaddrs.map((addr) => new Multiaddr(addr)); - - this.node.peerStore.addressBook.add(peerId, multiaddrs); - this.node.peerStore.protoBook.add(peerId, serializedPeer.protocols); - - return { - id: peerId, - multiaddrs: serializedPeer.multiaddrs ?? [], - protocols: serializedPeer.protocols ?? [], - }; - } - - deserializePeers(serializedPeers) { - return serializedPeers.map((peer) => this.deserializePeer(peer)); - } - async sortPeers(key, peers, count = this.config.dht.kBucketSize) { - const keyHash = await this.toHash(new TextEncoder().encode(key)); + const textEncoder = new TextEncoder(); + const keyHash = await this.toHash(textEncoder.encode(key)); const sorted = pipe( peers, (source) => map(source, async (peer) => ({ peer, - distance: uint8ArrayXor(keyHash, await this.toHash(peer.id.toBytes())), + distance: uint8ArrayXor(keyHash, Buffer.from(peer.sha256.slice(2), 'hex')), })), (source) => sort(source, (a, b) => uint8ArrayCompare(a.distance, b.distance)), (source) => take(source, count), @@ -230,41 +202,6 @@ class Libp2pService { return Buffer.from((await sha256.digest(encodedKey)).digest); } - async findNodesLocal(key) { - const keyHash = await this.toHash(new TextEncoder().encode(key)); - - const nodes = this.node._dht.routingTable.closestPeers( - keyHash, - this.config.dht.kBucketSize, - ); - - const result = []; - for (const node of nodes) { - result.push({ - id: node, - multiaddrs: this.getAddresses(node), - protocols: this.getProtocols(node), - }); - } - - return result; - } - - async findNodes(key) { - const encodedKey = new TextEncoder().encode(key); - const nodes = this.node._dht.peerRouting.getClosestPeers(encodedKey); - const result = []; - for await (const node of nodes) { - result.push({ - id: node, - multiaddrs: this.getAddresses(node), - protocols: this.getProtocols(node), - }); - } - - return result; - } - getPeers() { return this.node.connectionManager.connections; } @@ -278,7 +215,7 @@ class Libp2pService { this.node.handle(protocol, async (handlerProps) => { const { stream } = handlerProps; - const remotePeerId = handlerProps.connection.remotePeer._idB58String; + const remotePeerId = handlerProps.connection.remotePeer.toB58String(); const { message, valid, busy } = await this._readMessageFromStream( stream, this.isRequestValid.bind(this), @@ -369,15 +306,17 @@ class Libp2pService { }; } - async sendMessage(protocol, remotePeerId, messageType, operationId, keyword, message) { + async sendMessage(protocol, peerId, messageType, operationId, keyword, message) { const keywordUuid = uuidv5(keyword, uuidv5.URL); - // const sessionStream = this.getSessionStream(operationId, remotePeerId._idB58String); + // const sessionStream = this.getSessionStream(operationId, remotePeerId.toB58String()); // if (!sessionStream) { // } else { // stream = sessionStream; // } + const remotePeerId = createFromB58String(peerId); + const publicIp = (this.getAddresses(remotePeerId) ?? []) .map((addr) => addr.multiaddr) .filter((addr) => addr.isThinWaistAddress()) @@ -385,7 +324,7 @@ class Libp2pService { .filter((splittedAddr) => !ip.isPrivate(splittedAddr[2]))[0]?.[2]; this.logger.trace( - `Dialing remotePeerId: ${remotePeerId._idB58String} with public ip: ${publicIp}: protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`, + `Dialing remotePeerId: ${remotePeerId.toB58String()} with public ip: ${publicIp}: protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`, ); let dialResult; let dialStart; @@ -397,9 +336,7 @@ class Libp2pService { } catch (error) { dialEnd = Date.now(); this.logger.warn( - `Unable to dial peer: ${ - remotePeerId._idB58String - }. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ + `Unable to dial peer: ${remotePeerId.toB58String()}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ dialEnd - dialStart } ms. Error: ${error.message}`, ); @@ -409,15 +346,13 @@ class Libp2pService { }; } this.logger.trace( - `Created stream for peer: ${ - remotePeerId._idB58String - }. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ + `Created stream for peer: ${remotePeerId.toB58String()}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ dialEnd - dialStart } ms.`, ); const { stream } = dialResult; - this.updateSessionStream(operationId, keywordUuid, remotePeerId._idB58String, stream); + this.updateSessionStream(operationId, keywordUuid, remotePeerId.toB58String(), stream); const streamMessage = this.createStreamMessage( message, @@ -427,7 +362,7 @@ class Libp2pService { ); this.logger.trace( - `Sending message to ${remotePeerId._idB58String}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}`, + `Sending message to ${remotePeerId.toB58String()}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}`, ); let sendMessageStart; @@ -439,9 +374,7 @@ class Libp2pService { } catch (error) { sendMessageEnd = Date.now(); this.logger.warn( - `Unable to send message to peer: ${ - remotePeerId._idB58String - }. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}, execution time: ${ + `Unable to send message to peer: ${remotePeerId.toB58String()}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}, execution time: ${ sendMessageEnd - sendMessageStart } ms. Error: ${error.message}`, ); @@ -451,14 +384,14 @@ class Libp2pService { }; } - // if (!this.sessions[remotePeerId._idB58String]) { - // this.sessions[remotePeerId._idB58String] = { + // if (!this.sessions[remotePeerId.toB58String()]) { + // this.sessions[remotePeerId.toB58String()] = { // [operationId]: { // stream // } // } // } else { - // this.sessions[remotePeerId._idB58String][operationId] = { + // this.sessions[remotePeerId.toB58String()][operationId] = { // stream // } // } @@ -473,15 +406,13 @@ class Libp2pService { response = await this._readMessageFromStream( stream, this.isResponseValid.bind(this), - remotePeerId._idB58String, + remotePeerId.toB58String(), ); readResponseEnd = Date.now(); } catch (error) { readResponseEnd = Date.now(); this.logger.warn( - `Unable to read response from peer ${ - remotePeerId._idB58String - }. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, execution time: ${ + `Unable to read response from peer ${remotePeerId.toB58String()}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, execution time: ${ readResponseEnd - readResponseStart } ms. Error: ${error.message}`, ); @@ -491,9 +422,7 @@ class Libp2pService { }; } this.logger.trace( - `Receiving response from ${ - remotePeerId._idB58String - }. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, execution time: ${ + `Receiving response from ${remotePeerId.toB58String()}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, execution time: ${ readResponseEnd - readResponseStart } ms.`, ); @@ -638,7 +567,7 @@ class Libp2pService { sessionExists() { return true; - // return this.sessions[remotePeerId._idB58String] && this.sessions[remotePeerId._idB58String][operationId]; + // return this.sessions[remotePeerId.toB58String()] && this.sessions[remotePeerId.toB58String()][operationId]; } async isResponseValid() { @@ -721,6 +650,18 @@ class Libp2pService { getName() { return 'Libp2p'; } + + async findPeer(peerId) { + return this.node.peerRouting.findPeer(createFromB58String(peerId)); + } + + async dial(peerId) { + return this.node.dial(createFromB58String(peerId)); + } + + async getPeerInfo(peerId) { + return this.node.peerStore.get(createFromB58String(peerId)); + } } export default Libp2pService; diff --git a/src/modules/network/network-module-manager.js b/src/modules/network/network-module-manager.js index 5fabc10d28..50e6d24ed7 100644 --- a/src/modules/network/network-module-manager.js +++ b/src/modules/network/network-module-manager.js @@ -5,15 +5,9 @@ class NetworkModuleManager extends BaseModuleManager { return 'network'; } - serializePeers(peer) { + async onPeerConnected(listener) { if (this.initialized) { - return this.getImplementation().module.serializePeers(peer); - } - } - - deserializePeers(serializedPeers) { - if (this.initialized) { - return this.getImplementation().module.deserializePeers(serializedPeers); + return this.getImplementation().module.onPeerConnected(listener); } } @@ -23,48 +17,18 @@ class NetworkModuleManager extends BaseModuleManager { } } - async findNodes(key) { - if (this.initialized) { - return this.getImplementation().module.findNodes(key); - } - } - - async findNodesLocal(key) { - if (this.initialized) { - return this.getImplementation().module.findNodesLocal(key); - } - } - getMultiaddrs() { if (this.initialized) { return this.getImplementation().module.getMultiaddrs(); } } - getRoutingTableSize() { - if (this.initialized) { - return this.getImplementation().module.getRoutingTableSize(); - } - } - getPeers() { if (this.initialized) { return this.getImplementation().module.getPeers(); } } - async getProtocols(peerId) { - if (this.initialized) { - return this.getImplementation().module.getProtocols(peerId); - } - } - - async getAddresses(peerId) { - if (this.initialized) { - return this.getImplementation().module.getAddresses(peerId); - } - } - async sendMessage(protocol, remotePeerId, messageType, operationId, keyword, message) { if (this.initialized) { return this.getImplementation().module.sendMessage( @@ -109,15 +73,33 @@ class NetworkModuleManager extends BaseModuleManager { } } - getPrivateKey() { + async healthCheck() { if (this.initialized) { - return this.getImplementation().module.getPrivateKey(); + return this.getImplementation().module.healthCheck(); } } - async healthCheck() { + async findPeer(peerId) { if (this.initialized) { - return this.getImplementation().module.healthCheck(); + return this.getImplementation().module.findPeer(peerId); + } + } + + async dial(peerId) { + if (this.initialized) { + return this.getImplementation().module.dial(peerId); + } + } + + async getPeerInfo(peerId) { + if (this.initialized) { + return this.getImplementation().module.getPeerInfo(peerId); + } + } + + async toHash(key) { + if (this.initialized) { + return this.getImplementation().module.toHash(key); } } } diff --git a/src/modules/repository/implementation/sequelize/migrations/20221025120253-create-blockchain-event.js b/src/modules/repository/implementation/sequelize/migrations/20221025120253-create-blockchain-event.js new file mode 100644 index 0000000000..0105c35f4e --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20221025120253-create-blockchain-event.js @@ -0,0 +1,46 @@ +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.createTable('blockchain_event', { + id: { + type: Sequelize.INTEGER, + primaryKey: true, + autoIncrement: true, + }, + contract: { + type: Sequelize.STRING, + allowNull: false, + }, + blockchain_id: { + allowNull: false, + type: Sequelize.STRING, + }, + event: { + allowNull: false, + type: Sequelize.STRING, + }, + data: { + allowNull: false, + type: Sequelize.TEXT('long'), + }, + block: { + allowNull: false, + type: Sequelize.INTEGER, + }, + processed: { + allowNull: false, + type: Sequelize.BOOLEAN, + }, + created_at: { + allowNull: false, + type: Sequelize.DATE, + defaultValue: Sequelize.literal('NOW()'), + }, + updated_at: { + allowNull: false, + type: Sequelize.DATE, + defaultValue: Sequelize.literal('NOW()'), + }, + }); +} +export async function down({ context: { queryInterface } }) { + await queryInterface.dropTable('blockchain_event'); +} diff --git a/src/modules/repository/implementation/sequelize/migrations/20221025212800-create-shard.js b/src/modules/repository/implementation/sequelize/migrations/20221025212800-create-shard.js new file mode 100644 index 0000000000..cde676e6b8 --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20221025212800-create-shard.js @@ -0,0 +1,37 @@ +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.createTable('shard', { + peer_id: { + type: Sequelize.STRING, + primaryKey: true, + }, + blockchain_id: { + type: Sequelize.STRING, + primaryKey: true, + }, + ask: { + type: Sequelize.INTEGER, + allowNull: false, + }, + stake: { + type: Sequelize.INTEGER, + allowNull: false, + }, + last_seen: { + type: Sequelize.DATE, + allowNull: false, + defaultValue: new Date(0), + }, + last_dialed: { + type: Sequelize.DATE, + allowNull: false, + defaultValue: new Date(0), + }, + sha256: { + type: Sequelize.STRING, + allowNull: false, + }, + }); +} +export async function down({ context: { queryInterface } }) { + await queryInterface.dropTable('shard'); +} diff --git a/src/modules/repository/implementation/sequelize/migrations/20221028125900-create-blockchain.js b/src/modules/repository/implementation/sequelize/migrations/20221028125900-create-blockchain.js new file mode 100644 index 0000000000..de8caf714f --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20221028125900-create-blockchain.js @@ -0,0 +1,25 @@ +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.createTable('blockchain', { + blockchain_id: { + type: Sequelize.STRING, + primaryKey: true, + }, + contract: { + type: Sequelize.STRING, + primaryKey: true, + }, + last_checked_block: { + type: Sequelize.BIGINT, + allowNull: false, + defaultValue: -1, + }, + last_checked_timestamp: { + type: Sequelize.BIGINT, + allowNull: false, + defaultValue: 0, + }, + }); +} +export async function down({ context: { queryInterface } }) { + await queryInterface.dropTable('blockchain'); +} diff --git a/src/modules/repository/implementation/sequelize/models/blockchain-event.js b/src/modules/repository/implementation/sequelize/models/blockchain-event.js new file mode 100644 index 0000000000..fa00561d5d --- /dev/null +++ b/src/modules/repository/implementation/sequelize/models/blockchain-event.js @@ -0,0 +1,23 @@ +export default (sequelize, DataTypes) => { + const event = sequelize.define( + 'blockchain_event', + { + id: { + type: DataTypes.INTEGER, + primaryKey: true, + autoIncrement: true, + }, + contract: DataTypes.STRING, + blockchain_id: DataTypes.STRING, + event: DataTypes.STRING, + data: DataTypes.TEXT, + block: DataTypes.INTEGER, + processed: DataTypes.BOOLEAN, + }, + {}, + ); + event.associate = () => { + // associations can be defined here + }; + return event; +}; diff --git a/src/modules/repository/implementation/sequelize/models/blockchain.js b/src/modules/repository/implementation/sequelize/models/blockchain.js new file mode 100644 index 0000000000..ea3451f402 --- /dev/null +++ b/src/modules/repository/implementation/sequelize/models/blockchain.js @@ -0,0 +1,22 @@ +export default (sequelize, DataTypes) => { + const blockchain = sequelize.define( + 'blockchain', + { + blockchain_id: { + type: DataTypes.STRING, + primaryKey: true, + }, + contract: { + type: DataTypes.STRING, + primaryKey: true, + }, + last_checked_block: DataTypes.BIGINT, + last_checked_timestamp: DataTypes.BIGINT, + }, + { underscored: true }, + ); + blockchain.associate = () => { + // associations can be defined here + }; + return blockchain; +}; diff --git a/src/modules/repository/implementation/sequelize/models/shard.js b/src/modules/repository/implementation/sequelize/models/shard.js new file mode 100644 index 0000000000..f8d5acf617 --- /dev/null +++ b/src/modules/repository/implementation/sequelize/models/shard.js @@ -0,0 +1,19 @@ +export default (sequelize, DataTypes) => { + const shard = sequelize.define( + 'shard', + { + peer_id: { type: DataTypes.STRING, primaryKey: true }, + blockchain_id: { type: DataTypes.STRING, primaryKey: true }, + ask: DataTypes.INTEGER, + stake: DataTypes.INTEGER, + last_seen: DataTypes.DATE, + last_dialed: DataTypes.DATE, + sha256: DataTypes.STRING, + }, + { underscored: true }, + ); + shard.associate = () => { + // associations can be defined here + }; + return shard; +}; diff --git a/src/modules/repository/implementation/sequelize/sequelize-repository.js b/src/modules/repository/implementation/sequelize/sequelize-repository.js index f4bc3b4026..d53bab0097 100644 --- a/src/modules/repository/implementation/sequelize/sequelize-repository.js +++ b/src/modules/repository/implementation/sequelize/sequelize-repository.js @@ -243,6 +243,128 @@ class SequelizeRepository { }); } + // Sharding Table + async createManyPeerRecords(peers) { + return this.models.shard.bulkCreate(peers, { + ignoreDuplicates: true, + }); + } + + async createPeerRecord(peerId, blockchain, ask, stake, lastSeen, sha256) { + return this.models.shard.create( + { + peer_id: peerId, + blockchain_id: blockchain, + ask, + stake, + last_seen: lastSeen, + sha256, + }, + { + ignoreDuplicates: true, + }, + ); + } + + async getAllPeerRecords(blockchain) { + return this.models.shard.findAll({ + where: { + blockchain_id: { + [Sequelize.Op.eq]: blockchain, + }, + }, + raw: true, + }); + } + + async getPeersToDial(limit) { + return this.models.shard.findAll({ + attributes: ['peer_id'], + order: [['last_dialed', 'asc']], + limit, + raw: true, + }); + } + + async updatePeerAsk(peerId, ask) { + await this.models.shard.update( + { + ask, + }, + { + where: { peer_id: peerId }, + }, + ); + } + + async updatePeerStake(peerId, stake) { + await this.models.shard.update( + { + stake, + }, + { + where: { peer_id: peerId }, + }, + ); + } + + async updatePeerRecordLastDialed(peerId) { + await this.models.shard.update( + { + last_dialed: new Date(), + }, + { + where: { peer_id: peerId }, + }, + ); + } + + async updatePeerRecordLastSeenAndLastDialed(peerId) { + await this.models.shard.update( + { + last_dialed: new Date(), + last_seen: new Date(), + }, + { + where: { peer_id: peerId }, + }, + ); + } + + async removePeerRecord(peerId) { + await this.models.shard.destroy({ + where: { + peer_id: peerId, + }, + }); + } + + async updatePeerLastSeen(peerId, lastSeen) { + await this.models.shard.update( + { last_seen: lastSeen }, + { + where: { peer_id: peerId }, + }, + ); + } + + async getLastCheckedBlock(blockchainId, contract) { + return this.models.blockchain.findOne({ + attributes: ['last_checked_block', 'last_checked_timestamp'], + where: { blockchain_id: blockchainId, contract }, + raw: true, + }); + } + + async updateLastCheckedBlock(blockchainId, currentBlock, timestamp, contract) { + return this.models.blockchain.upsert({ + blockchain_id: blockchainId, + contract, + last_checked_block: currentBlock, + last_checked_timestamp: timestamp, + }); + } + // EVENT async createEventRecord(operationId, name, timestamp, value1, value2, value3) { return this.models.event.create({ @@ -281,7 +403,7 @@ class SequelizeRepository { }, }, }, - order: [['timestamp', 'ASC']], + order: [['timestamp', 'asc']], limit: Math.floor(HIGH_TRAFFIC_OPERATIONS_NUMBER_PER_HOUR / 60) * SEND_TELEMETRY_COMMAND_FREQUENCY_MINUTES, @@ -344,6 +466,94 @@ class SequelizeRepository { return abilities.map((e) => e.name); } + + async insertBlockchainEvents(blockchainEvents) { + const insertPromises = []; + for (const event of blockchainEvents) { + insertPromises.push( + new Promise((resolve, reject) => { + this.blockchainEventExists( + event.contract, + event.event, + event.data, + event.block, + event.blockchainId, + ) + .then(async (exists) => { + if (!exists) { + await this.models.blockchain_event + .create({ + contract: event.contract, + event: event.event, + data: event.data, + block: event.block, + blockchain_id: event.blockchainId, + processed: 0, + }) + .then((result) => resolve(result)); + } + resolve(null); + }) + .catch((error) => { + reject(error); + }); + }), + ); + } + return Promise.all(insertPromises); + } + + async blockchainEventExists(contract, event, data, block, blockchainId) { + const dbEvent = await this.models.blockchain_event.findOne({ + where: { + contract, + event, + data, + block, + blockchain_id: blockchainId, + }, + }); + return !!dbEvent; + } + + async markBlockchainEventAsProcessed( + id, + contract = null, + event = null, + data = null, + block = null, + blockchainId = null, + ) { + let condition; + if (id) { + condition = { + where: { + id, + }, + }; + } else { + condition = { + where: { + contract, + event, + data, + block, + blockchain_id: blockchainId, + }, + }; + } + return this.models.blockchain_event.update({ processed: true }, condition); + } + + async getLastEvent(contractName, blockchainId) { + return this.models.blockchain_event.findOne({ + where: { + contract: contractName, + blockchain_id: blockchainId, + }, + order: [['block', 'DESC']], + }); + } } export default SequelizeRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 863564629d..0a4fd2bc15 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -145,6 +145,80 @@ class RepositoryModuleManager extends BaseModuleManager { } } + // Sharding Table + async createManyPeerRecords(peers) { + if (this.initialized) { + return this.getImplementation().module.createManyPeerRecords(peers); + } + } + + async createPeerRecord(peerId, blockchain, ask, stake, lastSeen, sha256) { + if (this.initialized) { + return this.getImplementation().module.createPeerRecord( + peerId, + blockchain, + ask, + stake, + lastSeen, + sha256, + ); + } + } + + async getAllPeerRecords(blockchain) { + if (this.initialized) { + return this.getImplementation().module.getAllPeerRecords(blockchain); + } + } + + async getPeersToDial(limit) { + if (this.initialized) { + return this.getImplementation().module.getPeersToDial(limit); + } + } + + async removePeerRecord(peerId) { + if (this.initialized) { + return this.getImplementation().module.removePeerRecord(peerId); + } + } + + async updatePeerRecordLastDialed(peerId) { + if (this.initialized) { + return this.getImplementation().module.updatePeerRecordLastDialed(peerId); + } + } + + async updatePeerRecordLastSeenAndLastDialed(peerId) { + if (this.initialized) { + return this.getImplementation().module.updatePeerRecordLastSeenAndLastDialed(peerId); + } + } + + async updatePeerAsk(peerId, ask) { + if (this.initialized) { + return this.getImplementation().module.updatePeerAsk(peerId, ask); + } + } + + async updatePeerStake(peerId, stake) { + if (this.initialized) { + return this.getImplementation().module.updatePeerStake(peerId, stake); + } + } + + async getNeighbourhood(assertionId, r2) { + if (this.initialized) { + return this.getImplementation().module.getNeighbourhood(assertionId, r2); + } + } + + async updatePeerLastSeen(peerId, lastSeen) { + if (this.initialized) { + return this.getImplementation().module.updatePeerLastSeen(peerId, lastSeen); + } + } + // EVENT async createEventRecord( operationId, @@ -201,6 +275,41 @@ class RepositoryModuleManager extends BaseModuleManager { return this.getImplementation().module.getTokenAbilities(tokenId); } } + + async insertBlockchainEvents(events) { + if (this.initialized) { + return this.getImplementation().module.insertBlockchainEvents(events); + } + } + + async getLastEvent(contractName, blockchainId) { + if (this.initialized) { + return this.getImplementation().module.getLastEvent(contractName, blockchainId); + } + } + + async markBlockchainEventAsProcessed() { + if (this.initialized) { + return this.getImplementation().module.markBlockchainEventAsProcessed(); + } + } + + async getLastCheckedBlock(blockchainId, contract) { + if (this.initialized) { + return this.getImplementation().module.getLastCheckedBlock(blockchainId, contract); + } + } + + async updateLastCheckedBlock(blockchainId, currentBlock, timestamp, contract) { + if (this.initialized) { + return this.getImplementation().module.updateLastCheckedBlock( + blockchainId, + currentBlock, + timestamp, + contract, + ); + } + } } export default RepositoryModuleManager; diff --git a/src/service/get-service.js b/src/service/get-service.js index cfb14cb024..ae82755248 100644 --- a/src/service/get-service.js +++ b/src/service/get-service.js @@ -28,15 +28,7 @@ class GetService extends OperationService { } async processResponse(command, responseStatus, responseData, errorMessage = null) { - const { - operationId, - numberOfFoundNodes, - leftoverNodes, - keyword, - batchSize, - nodesSeen, - newFoundNodes, - } = command.data; + const { operationId, numberOfFoundNodes, leftoverNodes, keyword, batchSize } = command.data; const keywordsStatuses = await this.getResponsesStatuses( responseStatus, @@ -61,26 +53,6 @@ class GetService extends OperationService { this.completedStatuses, ); this.logResponsesSummary(completedNumber, failedNumber); - } else if (responseData?.nodes?.length) { - const leftoverNodesString = leftoverNodes.map((node) => node.id._idB58String); - const thisNodeId = this.networkModuleManager.getPeerId()._idB58String; - const newDiscoveredNodes = responseData.nodes.filter( - (node) => - node.id !== thisNodeId && - !nodesSeen.includes(node.id) && - !leftoverNodesString.includes(node.id), - ); - - const deserializedNodes = - this.networkModuleManager.deserializePeers(newDiscoveredNodes); - for (const node of deserializedNodes) { - for (const protocol of this.getNetworkProtocols()) { - if (node.protocols.includes(protocol)) { - newFoundNodes[node.id._idB58String] = { ...node, protocol }; - break; - } - } - } } if ( @@ -100,12 +72,7 @@ class GetService extends OperationService { ); this.logResponsesSummary(completedNumber, failedNumber); } else { - const newLeftoverNodes = await this.networkModuleManager.sortPeers( - keyword, - leftoverNodes.concat(Object.values(newFoundNodes)), - leftoverNodes.length, - ); - await this.scheduleOperationForLeftoverNodes(command.data, newLeftoverNodes); + await this.scheduleOperationForLeftoverNodes(command.data, leftoverNodes); } } } diff --git a/src/service/json-schema-service.js b/src/service/json-schema-service.js index 8cb8fe9401..cfc7002a07 100644 --- a/src/service/json-schema-service.js +++ b/src/service/json-schema-service.js @@ -9,7 +9,7 @@ class JsonSchemaService { } publishSchema() { - return publishSchema(this.blockchainModuleManager.getImplementationsNames()); + return publishSchema(this.blockchainModuleManager.getImplementationNames()); } getSchema() { diff --git a/src/service/sharding-table-service.js b/src/service/sharding-table-service.js new file mode 100644 index 0000000000..4fa33ca3ad --- /dev/null +++ b/src/service/sharding-table-service.js @@ -0,0 +1,210 @@ +import { + CONTRACTS, + DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS, +} from '../constants/constants.js'; + +class ShardingTableService { + constructor(ctx) { + this.config = ctx.config; + this.logger = ctx.logger; + this.blockchainModuleManager = ctx.blockchainModuleManager; + this.repositoryModuleManager = ctx.repositoryModuleManager; + this.networkModuleManager = ctx.networkModuleManager; + this.eventEmitter = ctx.eventEmitter; + } + + async initialize(blockchainId) { + await this.pullBlockchainShardingTable(blockchainId); + this.listenOnEvents(blockchainId); + const that = this; + await this.networkModuleManager.onPeerConnected((connection) => { + this.logger.trace( + `Node connected to ${connection.remotePeer.toB58String()}, updating sharding table last seen and last dialed.`, + ); + that.repositoryModuleManager + .updatePeerRecordLastSeenAndLastDialed(connection.remotePeer.toB58String()) + .catch((error) => { + this.logger.warn(`Unable to update connected peer, error: ${error.message}`); + }); + }); + } + + async pullBlockchainShardingTable(blockchainId) { + const lastCheckedBlock = await this.repositoryModuleManager.getLastCheckedBlock( + blockchainId, + CONTRACTS.SHARDING_TABLE_CONTRACT, + ); + + if ( + lastCheckedBlock?.last_checked_timestamp && + Date.now() - lastCheckedBlock.last_checked_timestamp < + DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS + ) { + return; + } + + const shardingTableLength = await this.blockchainModuleManager.getShardingTableLength( + blockchainId, + ); + let startingPeerId = await this.blockchainModuleManager.getShardingTableHead(blockchainId); + const pageSize = 10; + const shardingTable = []; + + this.logger.debug( + `Started pulling ${shardingTableLength} nodes from blockchain sharding table.`, + ); + + let sliceIndex = 0; + // TODO: mark starting block and listen to events from that block + while (shardingTable.length < shardingTableLength) { + // eslint-disable-next-line no-await-in-loop + const nodes = await this.blockchainModuleManager.getShardingTablePage( + blockchainId, + startingPeerId, + pageSize, + ); + shardingTable.push(...nodes.slice(sliceIndex).filter((node) => node.id !== '0x')); + sliceIndex = 1; + startingPeerId = nodes[nodes.length - 1].id; + } + + this.logger.debug( + `Finished pulling ${shardingTable.length} nodes from blockchain sharding table.`, + ); + + await this.repositoryModuleManager.createManyPeerRecords( + shardingTable.map((peer) => ({ + peer_id: this.blockchainModuleManager.convertHexToAscii(blockchainId, peer.id), + blockchain_id: blockchainId, + ask: peer.ask, + stake: peer.stake, + sha256: peer.idSha256, + })), + ); + } + + listenOnEvents(blockchainId) { + this.eventEmitter.on(`${blockchainId}-NodeObjCreated`, (event) => { + const eventData = JSON.parse(event.data); + const nodeId = this.blockchainModuleManager.convertHexToAscii( + event.blockchain_id, + eventData.nodeId, + ); + + this.logger.trace( + `${blockchainId}-NodeObjCreated event caught, adding peer id: ${nodeId} to sharding table.`, + ); + + this.repositoryModuleManager.createPeerRecord( + nodeId, + event.blockchain_id, + eventData.ask, + eventData.stake, + new Date(0), + eventData.nodeIdSha256, + ); + + this.repositoryModuleManager.markBlockchainEventAsProcessed(event.id); + }); + + this.eventEmitter.on(`${blockchainId}-StakeUpdated`, (event) => { + const eventData = JSON.parse(event.data); + const nodeId = this.blockchainModuleManager.convertHexToAscii( + event.blockchain_id, + eventData.nodeId, + ); + this.logger.trace( + `${blockchainId}-StakeUpdated event caught, updating stake value for peer id: ${nodeId} in sharding table.`, + ); + this.repositoryModuleManager.updatePeerStake(nodeId, eventData.stake); + + this.repositoryModuleManager.markBlockchainEventAsProcessed(event.id); + }); + + this.eventEmitter.on(`${blockchainId}-NodeRemoved`, (event) => { + const eventData = JSON.parse(event.data); + const nodeId = this.blockchainModuleManager.convertHexToAscii( + event.blockchain_id, + eventData.nodeId, + ); + this.logger.trace( + `${blockchainId}-NodeRemoved event caught, removing peer id: ${nodeId} from sharding table.`, + ); + this.repositoryModuleManager.removePeerRecord(nodeId); + + this.repositoryModuleManager.markBlockchainEventAsProcessed(event.id); + }); + } + + async findNeighbourhood(key, blockchainId, r2) { + const peers = await this.repositoryModuleManager.getAllPeerRecords(blockchainId); + + return this.networkModuleManager.sortPeers(key, peers, r2); + } + + async getBidSuggestion(neighbourhood, r0, higherPercentile) { + const neighbourhoodSortedByAsk = neighbourhood.sort( + (node_one, node_two) => node_one.ask < node_two.ask, + ); + + const eligibleNodes = neighbourhoodSortedByAsk.slice( + 0, + Math.ceil((higherPercentile / 100) * neighbourhood.length), + ); + + const eligibleNodesSortedByStake = eligibleNodes.sort( + (node_one, node_two) => node_one.stake > node_two.stake, + ); + + const awardedNodes = eligibleNodesSortedByStake.slice(0, r0); + + return Math.max(...awardedNodes.map((node) => node.ask)) * r0; + } + + async findEligibleNodes(neighbourhood, bid, r1, r0) { + return neighbourhood.filter((node) => node.ask <= bid / r0).slice(0, r1); + } + + async dial(peerId) { + const { addresses } = await this.findPeerAddressAndProtocols(peerId); + if (addresses.length) { + if (peerId !== this.networkModuleManager.getPeerId().toB58String()) { + this.logger.trace(`Dialing peer ${peerId}.`); + try { + await this.networkModuleManager.dial(peerId); + } catch (error) { + this.logger.trace(`Unable to dial peer ${peerId}. Error: ${error.message}`); + } + } + + await this.repositoryModuleManager.updatePeerRecordLastSeenAndLastDialed(peerId); + } else { + await this.repositoryModuleManager.updatePeerRecordLastDialed(peerId); + } + } + + async findPeerAddressAndProtocols(peerId) { + this.logger.trace(`Searching for peer ${peerId} multiaddresses in peer store.`); + let peerInfo = await this.networkModuleManager.getPeerInfo(peerId); + if ( + !peerInfo?.addresses?.length && + peerId !== this.networkModuleManager.getPeerId().toB58String() + ) { + try { + this.logger.trace(`Searching for peer ${peerId} multiaddresses on the network.`); + await this.networkModuleManager.findPeer(peerId); + peerInfo = await this.networkModuleManager.getPeerInfo(peerId); + } catch (error) { + this.logger.trace(`Unable to find peer ${peerId}. Error: ${error.message}`); + } + } + + return { + id: peerId, + addresses: peerInfo?.addresses ?? [], + protocols: peerInfo?.protocols ?? [], + }; + } +} + +export default ShardingTableService; diff --git a/test/bdd/features/get-errors.feature b/test/bdd/features/get-errors.feature index c031840c58..0af1a5eb49 100644 --- a/test/bdd/features/get-errors.feature +++ b/test/bdd/features/get-errors.feature @@ -6,21 +6,21 @@ Feature: Get errors test @get-errors Scenario: Getting non existent UAL Given I setup 4 nodes - #And I wait for 10 seconds + And I wait for 10 seconds And I call get directly to ot-node 1 with nonExistentUAL And I wait for last resolve to finalize Then Last GET operation finished with status: GetAssertionIdError - @get-errors - Scenario: GET operation result on a node with minimum replication factor greater than the number of nodes - Given I setup 4 nodes + #@get-errors + #Scenario: GET operation result on a node with minimum replication factor greater than the number of nodes + #Given I setup 4 nodes #And I wait for 10 seconds - And I call publish on node 1 with validAssertion - Then Last PUBLISH operation finished with status: COMPLETED - When I setup node 5 with minimumAckResponses.get set to 10 - #And I wait for 10 seconds - And I get operation result from node 5 for last published assertion - And I wait for last resolve to finalize - Then Last GET operation finished with status: GetNetworkError + #And I call publish on node 1 with validAssertion + #Then Last PUBLISH operation finished with status: COMPLETED + #When I setup node 5 with minimumAckResponses.get set to 10 + #And I wait for 20 seconds + #And I get operation result from node 5 for last published assertion + #And I wait for last resolve to finalize + #Then Last GET operation finished with status: GetNetworkError diff --git a/test/bdd/features/publish-errors.feature b/test/bdd/features/publish-errors.feature index d9ff81c24c..c64fc470e8 100644 --- a/test/bdd/features/publish-errors.feature +++ b/test/bdd/features/publish-errors.feature @@ -15,14 +15,14 @@ Feature: Publish errors test Scenario: Publish on a node with minimum replication factor greater than the number of nodes Given I setup 3 nodes And I setup node 4 with minimumAckResponses.publish set to 10 - #And I wait for 10 seconds + And I wait for 10 seconds And I call publish on node 4 with validAssertion Then Last PUBLISH operation finished with status: PublishStartError @publish-errors Scenario: Publish an asset directly on the node Given I setup 4 nodes - #And I wait for 10 seconds + And I wait for 10 seconds And I call publish on ot-node 4 directly with validPublishRequestBody And I wait for last publish to finalize Then Last PUBLISH operation finished with status: PublishValidateAssertionError diff --git a/test/bdd/features/release.feature b/test/bdd/features/release.feature index b1a1f3a265..c0c2307b64 100644 --- a/test/bdd/features/release.feature +++ b/test/bdd/features/release.feature @@ -6,19 +6,19 @@ Feature: Release related tests @release Scenario: Publishing a valid assertion Given I setup 4 nodes - #And I wait for 10 seconds + And I wait for 10 seconds When I call publish on node 4 with validAssertion Then Last PUBLISH operation finished with status: COMPLETED - @release - Scenario: Getting a result of the previously published assertion - Given I setup 4 nodes + #@release + #Scenario: Getting a result of the previously published assertion + #Given I setup 4 nodes #And I wait for 10 seconds - When I call publish on node 4 with validAssertion - And Last PUBLISH operation finished with status: COMPLETED - And I get operation result from node 4 for last published assertion - And Last GET operation finished with status: COMPLETED - And I setup 1 additional node - And I wait for 4 seconds - And I get operation result from node 5 for last published assertion - Then Last GET operation finished with status: COMPLETED + #When I call publish on node 4 with validAssertion + #And Last PUBLISH operation finished with status: COMPLETED + #And I get operation result from node 4 for last published assertion + #And Last GET operation finished with status: COMPLETED + #And I setup 1 additional node + #And I wait for 10 seconds + #And I get operation result from node 5 for last published assertion + #Then Last GET operation finished with status: COMPLETED diff --git a/test/bdd/steps/common.mjs b/test/bdd/steps/common.mjs index a2e8f1afb1..a29aa26327 100644 --- a/test/bdd/steps/common.mjs +++ b/test/bdd/steps/common.mjs @@ -13,7 +13,7 @@ const stepsUtils = new StepsUtils(); Given( /^I setup (\d+)[ additional]* node[s]*$/, - { timeout: 80000 }, + { timeout: 180000 }, function nodeSetup(nodeCount, done) { this.logger.log(`I setup ${nodeCount} node${nodeCount !== 1 ? 's' : ''}`); const wallets = this.state.localBlockchain.getWallets(); diff --git a/test/bdd/steps/lib/local-blockchain.mjs b/test/bdd/steps/lib/local-blockchain.mjs index 3ca497e319..a52840c31e 100644 --- a/test/bdd/steps/lib/local-blockchain.mjs +++ b/test/bdd/steps/lib/local-blockchain.mjs @@ -4,6 +4,9 @@ import Web3 from 'web3'; import { readFile } from 'fs/promises'; const hub = JSON.parse(await readFile('node_modules/dkg-evm-module/build/contracts/Hub.json')); +const shardingTable = JSON.parse( + await readFile('node_modules/dkg-evm-module/build/contracts/ShardingTable.json'), +); const uaiRegistry = JSON.parse( await readFile('node_modules/dkg-evm-module/build/contracts/UAIRegistry.json'), ); @@ -28,6 +31,7 @@ const accountPrivateKeys = JSON.parse( const sources = { hub, + shardingTable, uaiRegistry, assertionRegistry, assetRegistry, @@ -103,6 +107,9 @@ class LocalBlockchain { this.logger.info( `\t Hub contract address: \t\t\t\t\t${this.contracts.hub.instance._address}`, ); + this.logger.info( + `\t Sharding table contract address: \t\t\t${this.contracts.shardingTable.instance._address}`, + ); this.logger.info( `\t AssertionRegistry contract address: \t\t\t${this.contracts.assertionRegistry.instance._address}`, ); @@ -149,6 +156,13 @@ class LocalBlockchain { await this.deploy('hub', deployingWallet, []); await this.setContractAddress('Owner', deployingWallet.address, deployingWallet); + await this.deploy('shardingTable', deployingWallet, [this.contracts.hub.instance._address]); + await this.setContractAddress( + 'ShardingTable', + this.contracts.shardingTable.instance._address, + deployingWallet, + ); + await this.deploy('uaiRegistry', deployingWallet, [this.contracts.hub.instance._address]); await this.setContractAddress( 'UAIRegistry', @@ -159,7 +173,6 @@ class LocalBlockchain { await this.deploy('assertionRegistry', deployingWallet, [ this.contracts.hub.instance._address, ]); - await this.setContractAddress( 'AssertionRegistry', this.contracts.assertionRegistry.instance._address, @@ -167,19 +180,16 @@ class LocalBlockchain { ); await this.deploy('assetRegistry', deployingWallet, [this.contracts.hub.instance._address]); - await this.setContractAddress( 'AssetRegistry', this.contracts.assetRegistry.instance._address, deployingWallet, ); - await this.setupRole( this.contracts.uaiRegistry, this.contracts.assetRegistry.instance._address, ); - // this.logger.log('Deploying profileStorageContract'); await this.deploy('profileStorage', deployingWallet, [ this.contracts.hub.instance._address, ]); @@ -190,7 +200,6 @@ class LocalBlockchain { ); await this.deploy('erc20Token', deployingWallet, [this.contracts.hub.instance._address]); - await this.setContractAddress( 'Token', this.contracts.erc20Token.instance._address, @@ -199,7 +208,6 @@ class LocalBlockchain { await this.setupRole(this.contracts.erc20Token, deployingWallet.address); await this.deploy('profile', deployingWallet, [this.contracts.hub.instance._address]); - await this.setContractAddress( 'Profile', this.contracts.profile.instance._address,