diff --git a/config/config.json b/config/config.json index 3653858bd1..f7997c8051 100644 --- a/config/config.json +++ b/config/config.json @@ -152,7 +152,7 @@ } } }, - "maximumAssertionSizeInKb": 2500, + "maximumAssertionSizeInKb": 10000, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "info", @@ -305,7 +305,7 @@ } } }, - "maximumAssertionSizeInKb": 2500, + "maximumAssertionSizeInKb": 10000, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", @@ -471,7 +471,7 @@ } } }, - "maximumAssertionSizeInKb": 2500, + "maximumAssertionSizeInKb": 10000, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", @@ -637,7 +637,7 @@ } } }, - "maximumAssertionSizeInKb": 2500, + "maximumAssertionSizeInKb": 10000, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", @@ -803,7 +803,7 @@ } } }, - "maximumAssertionSizeInKb": 2500, + "maximumAssertionSizeInKb": 10000, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", diff --git a/installer/installer.sh b/installer/installer.sh index 80d58a2480..364dad676f 100755 --- a/installer/installer.sh +++ b/installer/installer.sh @@ -252,7 +252,7 @@ install_sql() { request_operational_wallet_keys() { WALLET_ADDRESSES=() WALLET_PRIVATE_KEYS=() - + echo "You'll now be asked to input addresses and private keys of your operational wallets for $1. Input an empty value to stop." wallet_no=1 while true; do @@ -263,7 +263,7 @@ request_operational_wallet_keys() { read -p "Please input the private key for your $1 operational wallet no. $wallet_no:" private_key [[ -z $private_key ]] && break text_color $GREEN "EVM operational wallet private key for $blockchain wallet no. $wallet_no: $private_key" - + WALLET_ADDRESSES+=($address) WALLET_PRIVATE_KEYS+=($private_key) wallet_no=$((wallet_no + 1)) @@ -271,7 +271,7 @@ request_operational_wallet_keys() { OP_WALLET_KEYS_JSON=$(jq -n ' [ - $ARGS.positional as $args + $ARGS.positional as $args | ($args | length / 2) as $upto | range(0; $upto) as $start | [{ evmAddress: $args[$start], privateKey: $args[$start + $upto] }] @@ -302,6 +302,7 @@ elif [ "$nodeEnv" == "testnet" ]; then blockchain_prompt=("OriginTrail Parachain" "Gnosis" "Both") fi + # Ask user which blockchain to connect to with strict input validation while true; do read -p "Please select the blockchain you want to connect your node to: @@ -319,9 +320,33 @@ while true; do esac done + +# Function to validate Operator Fees input +validate_operator_fees() { + local blockchain=$1 + local operator_fee_variable="${blockchain}_OPERATOR_FEES" + + read -p "Enter Operator Fees (0% - 100%) for $blockchain: " OPERATOR_FEES + if (( OPERATOR_FEES >= 0 && OPERATOR_FEES <= 100 )); then + text_color $GREEN "Operator Fees for $blockchain: $OPERATOR_FEES" + eval "${operator_fee_variable}=$OPERATOR_FEES" + else + text_color $RED "Please enter Operator Fees in the range of 0% - 100%. Try again." + validate_operator_fees $blockchain + fi +} + + # Case statement to handle blockchain-specific configurations case "$blockchain" in "OriginTrail Parachain" | "Gnosis" ) + + + if [ "$blockchain" == "OriginTrail Parachain" ]; then + blockchain="OTP" + fi + + # Input wallets for the selected blockchain request_operational_wallet_keys $blockchain EVM_OP_WALLET_KEYS_BLOCKCHAIN=$OP_WALLET_KEYS_JSON @@ -335,9 +360,13 @@ case "$blockchain" in read -p "Enter your profile shares token symbol for $blockchain: " SHARES_TOKEN_SYMBOL text_color $GREEN "Profile shares token symbol for $blockchain: $SHARES_TOKEN_SYMBOL" + # Prompt and validate Operator Fees for the first blockchain + validate_operator_fees $blockchain + eval "OPERATOR_FEE=\$${blockchain}_OPERATOR_FEES" + if [ "$blockchain" == "Gnosis" ]; then read -p "Enter your Gnosis RPC endpoint: " GNOSIS_RPC_ENDPOINT - text_color $GREEN "Gnosis RPC endpoint: $GNOSIS_RPC_ENDPOINT" + text_color $GREEN "Gnosis RPC endpoint: $GNOSIS_RPC_ENDPOINT" fi @@ -364,6 +393,10 @@ case "$blockchain" in read -p "Enter your profile shares token symbol for $blockchain1: " SHARES_TOKEN_SYMBOL text_color $GREEN "Profile shares token symbol for $blockchain1: $SHARES_TOKEN_SYMBOL" + # Prompt and validate Operator Fees for the first blockchain + validate_operator_fees $blockchain1 + OPERATOR_FEES_1=$OTP_OPERATOR_FEES + # Input wallets for the second blockchain request_operational_wallet_keys $blockchain2 EVM_OP_WALLET_KEYS_BLOCKCHAIN2=$OP_WALLET_KEYS_JSON @@ -377,6 +410,11 @@ case "$blockchain" in read -p "Enter your profile shares token symbol for $blockchain2: " SHARES_TOKEN_SYMBOL_2 text_color $GREEN "Profile shares token symbol for $blockchain2: $SHARES_TOKEN_SYMBOL_2" + # Prompt and validate Operator Fees for the second blockchain + validate_operator_fees $blockchain2 + OPERATOR_FEES_2=$Gnosis_OPERATOR_FEES + + read -p "Enter your Gnosis RPC endpoint: " GNOSIS_RPC_ENDPOINT text_color $GREEN "Gnosis RPC endpoint: $GNOSIS_RPC_ENDPOINT" @@ -386,6 +424,7 @@ case "$blockchain" in exit;; esac + perform_step npm ci --omit=dev --ignore-scripts "Executing npm install" CONFIG_DIR=$OTNODE_DIR/.. @@ -439,7 +478,7 @@ fi # Check if "Both" blockchains are selected if [ "$blockchain" == "Both" ]; then - perform_step $(jq --arg otp_blockchain_id "$otp_blockchain_id" --argjson EVM_OP_WALLET_KEYS_BLOCKCHAIN1 "$EVM_OP_WALLET_KEYS_BLOCKCHAIN1" --argjson EVM_OP_WALLET_KEYS_BLOCKCHAIN2 "$EVM_OP_WALLET_KEYS_BLOCKCHAIN2" --arg EVM_MANAGEMENT_WALLET "$EVM_MANAGEMENT_WALLET" --arg SHARES_TOKEN_NAME "$SHARES_TOKEN_NAME" --arg SHARES_TOKEN_SYMBOL "$SHARES_TOKEN_SYMBOL" --arg gnosis_blockchain_id "$gnosis_blockchain_id" --arg EVM_OPERATIONAL_WALLET_2 "$EVM_OPERATIONAL_WALLET_2" --arg EVM_OPERATIONAL_PRIVATE_KEY_2 "$EVM_OPERATIONAL_PRIVATE_KEY_2" --arg EVM_MANAGEMENT_WALLET_2 "$EVM_MANAGEMENT_WALLET_2" --arg SHARES_TOKEN_NAME_2 "$SHARES_TOKEN_NAME_2" --arg SHARES_TOKEN_SYMBOL_2 "$SHARES_TOKEN_SYMBOL_2" --arg GNOSIS_RPC_ENDPOINT "$GNOSIS_RPC_ENDPOINT" ' + perform_step $(jq --arg otp_blockchain_id "$otp_blockchain_id" --argjson EVM_OP_WALLET_KEYS_BLOCKCHAIN1 "$EVM_OP_WALLET_KEYS_BLOCKCHAIN1" --argjson EVM_OP_WALLET_KEYS_BLOCKCHAIN2 "$EVM_OP_WALLET_KEYS_BLOCKCHAIN2" --arg EVM_MANAGEMENT_WALLET "$EVM_MANAGEMENT_WALLET" --arg SHARES_TOKEN_NAME "$SHARES_TOKEN_NAME" --arg SHARES_TOKEN_SYMBOL "$SHARES_TOKEN_SYMBOL" --argjson OPERATOR_FEES_1 "$OPERATOR_FEES_1" --argjson OPERATOR_FEES_2 "$OPERATOR_FEES_2" --arg gnosis_blockchain_id "$gnosis_blockchain_id" --arg EVM_OPERATIONAL_WALLET_2 "$EVM_OPERATIONAL_WALLET_2" --arg EVM_OPERATIONAL_PRIVATE_KEY_2 "$EVM_OPERATIONAL_PRIVATE_KEY_2" --arg EVM_MANAGEMENT_WALLET_2 "$EVM_MANAGEMENT_WALLET_2" --arg SHARES_TOKEN_NAME_2 "$SHARES_TOKEN_NAME_2" --arg SHARES_TOKEN_SYMBOL_2 "$SHARES_TOKEN_SYMBOL_2" --arg GNOSIS_RPC_ENDPOINT "$GNOSIS_RPC_ENDPOINT" ' .modules.blockchain.implementation += { "otp:'$otp_blockchain_id'": { "enabled": true, @@ -447,7 +486,8 @@ if [ "$blockchain" == "Both" ]; then "operationalWallets": $EVM_OP_WALLET_KEYS_BLOCKCHAIN1, "evmManagementWalletPublicKey": $EVM_MANAGEMENT_WALLET, "sharesTokenName": $SHARES_TOKEN_NAME, - "sharesTokenSymbol": $SHARES_TOKEN_SYMBOL + "sharesTokenSymbol": $SHARES_TOKEN_SYMBOL, + "operatorFee": $OPERATOR_FEES_1 } }, "gnosis:'$gnosis_blockchain_id'": { @@ -457,14 +497,16 @@ if [ "$blockchain" == "Both" ]; then "evmManagementWalletPublicKey": $EVM_MANAGEMENT_WALLET_2, "sharesTokenName": $SHARES_TOKEN_NAME_2, "sharesTokenSymbol": $SHARES_TOKEN_SYMBOL_2, - "rpcEndpoints": [$GNOSIS_RPC_ENDPOINT] + "operatorFee": $OPERATOR_FEES_2, + "rpcEndpoints": [$GNOSIS_RPC_ENDPOINT] } } }' $CONFIG_DIR/.origintrail_noderc > $CONFIG_DIR/origintrail_noderc_tmp) "Adding node wallets to node config file 1/2 for Both" - else + + # Single blockchain selected - if [ "$blockchain" = "OriginTrail Parachain" ]; then + if [ "$blockchain" = "OriginTrail Parachain" ] || [ "$blockchain" = "OTP" ]; then blockchain="otp" blockchain_id="$otp_blockchain_id" elif [ "$blockchain" = "Gnosis" ]; then @@ -476,29 +518,30 @@ else ADD_GNOSIS_RPC="true" fi - blockchain_arg="$blockchain:$blockchain_id" -jq --arg blockchain_arg "$blockchain_arg" \ - --argjson EVM_OP_WALLET_KEYS_BLOCKCHAIN "$EVM_OP_WALLET_KEYS_BLOCKCHAIN" \ - --arg EVM_MANAGEMENT_WALLET "$EVM_MANAGEMENT_WALLET" \ - --arg SHARES_TOKEN_NAME "$SHARES_TOKEN_NAME" \ - --arg SHARES_TOKEN_SYMBOL "$SHARES_TOKEN_SYMBOL" \ - --argjson ADD_GNOSIS_RPC "$ADD_GNOSIS_RPC" \ - --arg GNOSIS_RPC_ENDPOINT "$GNOSIS_RPC_ENDPOINT" ' -(.modules.blockchain.implementation += { - ($blockchain_arg): { - "enabled": true, - "config": { - "operationalWallets": $EVM_OP_WALLET_KEYS_BLOCKCHAIN, - "evmManagementWalletPublicKey": $EVM_MANAGEMENT_WALLET, - "sharesTokenName": $SHARES_TOKEN_NAME, - "sharesTokenSymbol": $SHARES_TOKEN_SYMBOL - } - } -}) | if $ADD_GNOSIS_RPC then .blockchain.implementation[$blockchain_arg].config += {"rpcEndpoints": [$GNOSIS_RPC_ENDPOINT]} else . end -' "$CONFIG_DIR/.origintrail_noderc" > "$CONFIG_DIR/origintrail_noderc_tmp" + jq --arg blockchain_arg "$blockchain_arg" \ + --argjson EVM_OP_WALLET_KEYS_BLOCKCHAIN "$EVM_OP_WALLET_KEYS_BLOCKCHAIN" \ + --arg EVM_MANAGEMENT_WALLET "$EVM_MANAGEMENT_WALLET" \ + --arg SHARES_TOKEN_NAME "$SHARES_TOKEN_NAME" \ + --arg SHARES_TOKEN_SYMBOL "$SHARES_TOKEN_SYMBOL" \ + --argjson ADD_GNOSIS_RPC "$ADD_GNOSIS_RPC" \ + --argjson OPERATOR_FEE $OPERATOR_FEE \ + --arg GNOSIS_RPC_ENDPOINT "$GNOSIS_RPC_ENDPOINT" ' + (.modules.blockchain.implementation += { + ($blockchain_arg): { + "enabled": true, + "config": { + "operationalWallets": $EVM_OP_WALLET_KEYS_BLOCKCHAIN, + "evmManagementWalletPublicKey": $EVM_MANAGEMENT_WALLET, + "sharesTokenName": $SHARES_TOKEN_NAME, + "sharesTokenSymbol": $SHARES_TOKEN_SYMBOL, + "operatorFee": $OPERATOR_FEE + } + } + }) | if $ADD_GNOSIS_RPC then .modules.blockchain.implementation[$blockchain_arg].config += {"rpcEndpoints": [$GNOSIS_RPC_ENDPOINT]} else . end + ' "$CONFIG_DIR/.origintrail_noderc" > "$CONFIG_DIR/origintrail_noderc_tmp" fi perform_step mv $CONFIG_DIR/origintrail_noderc_tmp $CONFIG_DIR/.origintrail_noderc "Adding node wallets to node config file 2/2" diff --git a/ot-node.js b/ot-node.js index 7d2f358312..05aab1138f 100644 --- a/ot-node.js +++ b/ot-node.js @@ -12,6 +12,7 @@ import OtAutoUpdater from './src/modules/auto-updater/implementation/ot-auto-upd import MigrationExecutor from './src/migration/migration-executor.js'; const require = createRequire(import.meta.url); +const { setTimeout } = require('timers/promises'); const pjson = require('./package.json'); const configjson = require('./config/config.json'); @@ -220,6 +221,7 @@ class OTNode { `npm run set-stake -- --rpcEndpoint=${blockchainConfig.rpcEndpoints[0]} --stake=${blockchainConfig.initialStakeAmount} --operationalWalletPrivateKey=${blockchainConfig.operationalWallets[0].privateKey} --managementWalletPrivateKey=${blockchainConfig.evmManagementWalletPrivateKey} --hubContractAddress=${blockchainConfig.hubContractAddress}`, { stdio: 'inherit' }, ); + await setTimeout(10000); execSync( `npm run set-ask -- --rpcEndpoint=${ blockchainConfig.rpcEndpoints[0] diff --git a/package-lock.json b/package-lock.json index 64b3698161..fa57218fd3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.2.0", + "version": "6.2.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.2.0", + "version": "6.2.1", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index e6c070c571..6b30c31598 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.2.0", + "version": "6.2.1", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/commands/protocols/common/epoch-check-command.js b/src/commands/protocols/common/epoch-check-command.js index 679d8157c1..4d48e32457 100644 --- a/src/commands/protocols/common/epoch-check-command.js +++ b/src/commands/protocols/common/epoch-check-command.js @@ -6,9 +6,9 @@ import { TRANSACTION_CONFIRMATIONS, OPERATION_ID_STATUS, ERROR_TYPE, - NODE_ENVIRONMENTS, + TRIPLE_STORE_REPOSITORIES, + SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS, } from '../../../constants/constants.js'; -import MigrationExecutor from '../../../migration/migration-executor.js'; class EpochCheckCommand extends Command { constructor(ctx) { @@ -22,25 +22,12 @@ class EpochCheckCommand extends Command { this.fileService = ctx.fileService; this.proximityScoringService = ctx.proximityScoringService; this.hashingService = ctx.hashingService; + this.tripleStoreService = ctx.tripleStoreService; this.errorType = ERROR_TYPE.COMMIT_PROOF.EPOCH_CHECK_ERROR; } async execute(command) { - const migrationExecuted = await MigrationExecutor.migrationAlreadyExecuted( - 'ualExtensionTripleStoreMigration', - this.fileService, - ); - if ( - process.env.NODE_ENV !== NODE_ENVIRONMENTS.DEVELOPMENT && - process.env.NODE_ENV !== NODE_ENVIRONMENTS.TEST && - !migrationExecuted - ) { - this.logger.info( - 'Epoch check: command will be postponed until ual extension triple store migration is completed', - ); - return Command.repeat(); - } this.logger.info('Epoch check: Starting epoch check command'); const operationId = this.operationIdService.generateId(); @@ -122,12 +109,20 @@ class EpochCheckCommand extends Command { minStake, maxStake, ) { + const peerRecord = await this.repositoryModuleManager.getPeerRecord( + this.networkModuleManager.getPeerId().toB58String(), + blockchain, + ); + + const ask = this.blockchainModuleManager.convertToWei(blockchain, peerRecord.ask); + const timestamp = await this.blockchainModuleManager.getBlockchainTimestamp(blockchain); const eligibleAgreementForSubmitCommit = await this.repositoryModuleManager.getEligibleAgreementsForSubmitCommit( timestamp, blockchain, commitWindowDurationPerc, + SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS, ); this.logger.info( `Epoch check: Found ${eligibleAgreementForSubmitCommit.length} eligible agreements for submit commit for blockchain: ${blockchain}`, @@ -207,6 +202,45 @@ class EpochCheckCommand extends Command { continue; } + // If proof was ever sent = data is present in the Triple Store + let isAssetSynced = Boolean(serviceAgreement.lastProofEpoch); + if (!isAssetSynced) { + // Else: check Public Current Repository + isAssetSynced = await this.tripleStoreService.assertionExists( + TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, + serviceAgreement.assertionId, + ); + } + + // If data is not in the Triple Store, check if ask satisfied + if (!isAssetSynced) { + const agreementData = await this.blockchainModuleManager.getAgreementData( + blockchain, + serviceAgreement.agreementId, + ); + const blockchainAssertionSize = + await this.blockchainModuleManager.getAssertionSize( + blockchain, + serviceAgreement.assertionId, + ); + + const serviceAgreementBid = await this.serviceAgreementService.calculateBid( + blockchain, + blockchainAssertionSize, + agreementData, + r0, + ); + + if (serviceAgreementBid.lt(ask)) { + this.logger.trace( + `Epoch check: Ask (${ask.toString()} wei) isn't satisfied by the bid (${serviceAgreementBid.toString()} wei) for the Service Agreement with the ID: ${ + serviceAgreement.agreementId + }. Skipping scheduling submitCommitCommand for blockchain: ${blockchain}`, + ); + continue; + } + } + this.logger.trace( `Epoch check: Calculated rank: ${ rank + 1 @@ -220,6 +254,7 @@ class EpochCheckCommand extends Command { serviceAgreement, neighbourhoodEdges, closestNode, + isAssetSynced, ), ); } catch (error) { @@ -319,7 +354,7 @@ class EpochCheckCommand extends Command { return false; } - async scheduleSubmitCommitCommand(agreement, neighbourhoodEdges, closestNode) { + async scheduleSubmitCommitCommand(agreement, neighbourhoodEdges, closestNode, isAssetSynced) { const commandData = { operationId: this.operationIdService.generateId(), blockchain: agreement.blockchainId, @@ -329,19 +364,30 @@ class EpochCheckCommand extends Command { hashFunctionId: agreement.hashFunctionId, epoch: agreement.currentEpoch, agreementId: agreement.agreementId, + assertionId: agreement.assertionId, stateIndex: agreement.stateIndex, closestNode: closestNode.index, leftNeighborhoodEdge: neighbourhoodEdges?.leftEdge.index, rightNeighborhoodEdge: neighbourhoodEdges?.rightEdge.index, }; - await this.commandExecutor.add({ - name: 'submitCommitCommand', - sequence: [], - retries: COMMAND_RETRIES.SUBMIT_COMMIT, - data: commandData, - transactional: false, - }); + if (isAssetSynced) { + await this.commandExecutor.add({ + name: 'submitCommitCommand', + sequence: [], + retries: COMMAND_RETRIES.SUBMIT_COMMIT, + data: commandData, + transactional: false, + }); + } else { + await this.commandExecutor.add({ + name: 'simpleAssetSyncCommand', + sequence: ['submitCommitCommand'], + retries: COMMAND_RETRIES.SIMPLE_ASSET_SYNC, + data: commandData, + transactional: false, + }); + } } async scheduleSubmitProofsCommand(agreement) { diff --git a/src/commands/protocols/common/handle-protocol-message-command.js b/src/commands/protocols/common/handle-protocol-message-command.js index 65855c51ae..223a5e472d 100644 --- a/src/commands/protocols/common/handle-protocol-message-command.js +++ b/src/commands/protocols/common/handle-protocol-message-command.js @@ -97,7 +97,7 @@ class HandleProtocolMessageCommand extends Command { operationId, ) { const geAgreementData = async () => { - const agreementId = await this.serviceAgreementService.generateId( + const agreementId = this.serviceAgreementService.generateId( blockchain, contract, tokenId, @@ -108,12 +108,14 @@ class HandleProtocolMessageCommand extends Command { `Calculated agreement id: ${agreementId} for contract: ${contract}, token id: ${tokenId}, keyword: ${keyword}, hash function id: ${hashFunctionId}, operationId: ${operationId}`, ); + const agreementData = await this.blockchainModuleManager.getAgreementData( + blockchain, + agreementId, + ); + return { agreementId, - agreementData: await this.blockchainModuleManager.getAgreementData( - blockchain, - agreementId, - ), + agreementData, }; }; @@ -123,9 +125,7 @@ class HandleProtocolMessageCommand extends Command { blockchain, ); - const ask = this.blockchainModuleManager.convertToWei(blockchain, peerRecord.ask); - - return this.blockchainModuleManager.toBigNumber(blockchain, ask); + return this.blockchainModuleManager.convertToWei(blockchain, peerRecord.ask); }; const [{ agreementId, agreementData }, blockchainAssertionSize, r0, ask] = @@ -148,27 +148,13 @@ class HandleProtocolMessageCommand extends Command { }; } - const now = await this.blockchainModuleManager.getBlockchainTimestamp(blockchain); - - // todo: use shared function with epoch commands - const currentEpoch = Math.floor( - (Number(now) - Number(agreementData.startTime)) / Number(agreementData.epochLength), + const serviceAgreementBid = await this.serviceAgreementService.calculateBid( + blockchain, + blockchainAssertionSize, + agreementData, + r0, ); - // todo: consider optimizing to take into account cases where some proofs have already been submitted - const epochsLeft = Number(agreementData.epochsNumber) - currentEpoch; - - const divisor = this.blockchainModuleManager - .toBigNumber(blockchain, r0) - .mul(epochsLeft) - .mul(blockchainAssertionSize); - - const serviceAgreementBid = agreementData.tokenAmount - .add(agreementData.updateTokenAmount) - .mul(1024) - .div(divisor) - .add(1); // add 1 wei because of the precision loss - const bidAskLog = `Service agreement bid: ${serviceAgreementBid}, ask: ${ask}, operationId: ${operationId}`; this.logger.trace(bidAskLog); diff --git a/src/commands/protocols/common/network-protocol-command.js b/src/commands/protocols/common/network-protocol-command.js index 8bbf2affbc..6c3aab9c15 100644 --- a/src/commands/protocols/common/network-protocol-command.js +++ b/src/commands/protocols/common/network-protocol-command.js @@ -19,7 +19,7 @@ class NetworkProtocolCommand extends Command { const batchSize = await this.getBatchSize(blockchain); const minAckResponses = await this.getMinAckResponses(blockchain); - const serviceAgreementId = await this.serviceAgreementService.generateId( + const serviceAgreementId = this.serviceAgreementService.generateId( blockchain, contract, tokenId, diff --git a/src/commands/protocols/common/simple-asset-sync-command.js b/src/commands/protocols/common/simple-asset-sync-command.js new file mode 100644 index 0000000000..dd580c1c74 --- /dev/null +++ b/src/commands/protocols/common/simple-asset-sync-command.js @@ -0,0 +1,191 @@ +import { setTimeout } from 'timers/promises'; +import Command from '../../command.js'; +import { + COMMAND_RETRIES, + ERROR_TYPE, + OPERATION_ID_STATUS, + OPERATION_STATUS, + SIMPLE_ASSET_SYNC_PARAMETERS, +} from '../../../constants/constants.js'; + +class SimpleAssetSyncCommand extends Command { + constructor(ctx) { + super(ctx); + this.tripleStoreService = ctx.tripleStoreService; + this.ualService = ctx.ualService; + this.operationIdService = ctx.operationIdService; + this.getService = ctx.getService; + this.repositoryModuleManager = ctx.repositoryModuleManager; + this.commandExecutor = ctx.commandExecutor; + + this.errorType = ERROR_TYPE.COMMIT_PROOF.SIMPLE_ASSET_SYNC_ERROR; + } + + /** + * Executes command and produces one or more events + * @param command + */ + async execute(command) { + const { + operationId, + blockchain, + contract, + tokenId, + keyword, + hashFunctionId, + epoch, + assertionId, + stateIndex, + } = command.data; + + await this.operationIdService.updateOperationIdStatus( + operationId, + blockchain, + OPERATION_ID_STATUS.COMMIT_PROOF.SIMPLE_ASSET_SYNC_START, + ); + + this.logger.info( + `[SIMPLE_ASSET_SYNC] (${operationId}): Started command for the ` + + `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + + `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + + `State Index: ${stateIndex}, Retry number: ${ + COMMAND_RETRIES.SIMPLE_ASSET_SYNC - command.retries + 1 + }`, + ); + + const ual = this.ualService.deriveUAL(blockchain, contract, tokenId); + + const getOperationId = await this.operationIdService.generateOperationId( + OPERATION_ID_STATUS.GET.GET_START, + ); + + this.logger.debug( + `[SIMPLE_ASSET_SYNC] (${operationId}): Fetching Knowledge Asset from the network for the ` + + `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + + `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + + `State Index: ${stateIndex}, Network Get Operation ID: ${getOperationId}`, + ); + + await Promise.all([ + this.operationIdService.updateOperationIdStatus( + getOperationId, + blockchain, + OPERATION_ID_STATUS.GET.GET_INIT_START, + ), + this.repositoryModuleManager.createOperationRecord( + this.getService.getOperationName(), + getOperationId, + OPERATION_STATUS.IN_PROGRESS, + ), + ]); + + let getResult; + + try { + await this.commandExecutor.add({ + name: 'networkGetCommand', + sequence: [], + delay: 0, + data: { + operationId: getOperationId, + id: ual, + blockchain, + contract, + tokenId, + state: assertionId, + hashFunctionId, + assertionId, + stateIndex, + assetSync: true, + }, + transactional: false, + }); + + await this.operationIdService.updateOperationIdStatus( + getOperationId, + blockchain, + OPERATION_ID_STATUS.GET.GET_INIT_END, + ); + + let attempt = 0; + do { + // eslint-disable-next-line no-await-in-loop + await setTimeout(SIMPLE_ASSET_SYNC_PARAMETERS.GET_RESULT_POLLING_INTERVAL_MILLIS); + + // eslint-disable-next-line no-await-in-loop + getResult = await this.operationIdService.getOperationIdRecord(getOperationId); + attempt += 1; + } while ( + attempt < SIMPLE_ASSET_SYNC_PARAMETERS.GET_RESULT_POLLING_MAX_ATTEMPTS && + getResult?.status !== OPERATION_ID_STATUS.FAILED && + getResult?.status !== OPERATION_ID_STATUS.COMPLETED + ); + } catch (error) { + this.logger.warn( + `[SIMPLE_ASSET_SYNC] (${operationId}): Unable to sync Knowledge Asset for the ` + + `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + + `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + + `State Index: ${stateIndex}, Network Get Operation ID: ${getOperationId}, ` + + `Operation failed with error: ${error}.`, + ); + + return Command.retry(); + } + + await this.operationIdService.updateOperationIdStatus( + operationId, + blockchain, + OPERATION_ID_STATUS.COMMIT_PROOF.SIMPLE_ASSET_SYNC_END, + ); + + if (getResult?.status === OPERATION_ID_STATUS.COMPLETED) { + this.logger.info( + `[SIMPLE_ASSET_SYNC] (${operationId}): Successfully executed command for the ` + + `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + + `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + + `State Index: ${stateIndex}, Network Get Operation ID: ${getOperationId}, `, + ); + + return this.continueSequence(command.data, command.sequence, { + retries: COMMAND_RETRIES.SUBMIT_COMMIT, + }); + } + + this.logger.log( + `[SIMPLE_ASSET_SYNC] (${operationId}): Failed to executed command. Couldn't find asset on the network for the ` + + `Blockchain: ${blockchain}, Contract: ${contract}, Token ID: ${tokenId}, ` + + `Keyword: ${keyword}, Hash function ID: ${hashFunctionId}, Epoch: ${epoch}, ` + + `State Index: ${stateIndex}, Network Get Operation ID: ${getOperationId}, `, + ); + } + + async retryFinished(command) { + const { blockchain, contract, tokenId, operationId } = command.data; + const ual = this.ualService.deriveUAL(blockchain, contract, tokenId); + await this.handleError( + operationId, + blockchain, + `Max retry count for the ${command.name} reached! ` + + `Unable to sync Knowledge Asset on the ${blockchain} blockchain with the UAL: ${ual}`, + this.errorType, + true, + ); + } + + /** + * Builds default simpleAssetSyncCommand + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'simpleAssetSyncCommand', + delay: 0, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +export default SimpleAssetSyncCommand; diff --git a/src/commands/protocols/get/sender/get-assertion-id-command.js b/src/commands/protocols/get/sender/get-assertion-id-command.js index b4773ce456..f21a985688 100644 --- a/src/commands/protocols/get/sender/get-assertion-id-command.js +++ b/src/commands/protocols/get/sender/get-assertion-id-command.js @@ -1,5 +1,10 @@ import Command from '../../../command.js'; -import { ERROR_TYPE, GET_STATES, ZERO_BYTES32 } from '../../../../constants/constants.js'; +import { + ERROR_TYPE, + GET_STATES, + PENDING_STORAGE_REPOSITORIES, + ZERO_BYTES32, +} from '../../../../constants/constants.js'; class GetAssertionIdCommand extends Command { constructor(ctx) { @@ -7,6 +12,9 @@ class GetAssertionIdCommand extends Command { this.operationService = ctx.getService; this.blockchainModuleManager = ctx.blockchainModuleManager; this.ualService = ctx.ualService; + this.serviceAgreementService = ctx.serviceAgreementService; + this.pendingStorageService = ctx.pendingStorageService; + this.repositoryModuleManager = ctx.repositoryModuleManager; this.errorType = ERROR_TYPE.GET.GET_ASSERTION_ID_ERROR; } @@ -16,8 +24,10 @@ class GetAssertionIdCommand extends Command { * @param command */ async execute(command) { - const { operationId, blockchain, contract, tokenId, state } = command.data; - + const { operationId, blockchain, contract, tokenId, state, hashFunctionId } = command.data; + this.logger.info( + `Getting assertion id for token id: ${tokenId}, contract: ${contract}, state: ${state}, hash function id: ${hashFunctionId}, operation id: ${operationId} on blockchain: ${blockchain}`, + ); let assertionId; if (!Object.values(GET_STATES).includes(state)) { if (state === ZERO_BYTES32) { @@ -31,10 +41,19 @@ class GetAssertionIdCommand extends Command { return Command.empty(); } - const pendingState = await this.blockchainModuleManager.getUnfinalizedAssertionId( + let pendingState; + pendingState = await this.pendingStorageService.getPendingState( + PENDING_STORAGE_REPOSITORIES.PUBLIC, blockchain, + contract, tokenId, ); + if (!pendingState) { + pendingState = await this.blockchainModuleManager.getUnfinalizedAssertionId( + blockchain, + tokenId, + ); + } if ( state !== pendingState && @@ -49,7 +68,7 @@ class GetAssertionIdCommand extends Command { await this.handleError( operationId, blockchain, - `The provided state: ${state} does not exist on the ${blockchain} blockchain, ``within contract: ${contract}, for the Knowledge Asset with tokenId: ${tokenId}.`, + `The provided state: ${state} does not exist on the ${blockchain} blockchain, ``within contract: ${contract}, for the Knowledge Asset with tokenId: ${tokenId}, operation id: ${operationId}.`, this.errorType, ); @@ -59,27 +78,103 @@ class GetAssertionIdCommand extends Command { assertionId = state; } else { this.logger.debug( - `Searching for latest assertion id on ${blockchain} on contract: ${contract} with tokenId: ${tokenId}`, + `Searching for latest assertion id on ${blockchain} on contract: ${contract} with tokenId: ${tokenId}, operation id: ${operationId}`, + ); + + const assertionIds = await this.blockchainModuleManager.getAssertionIds( + blockchain, + contract, + tokenId, ); + const latestFinalizedAssertionId = assertionIds[assertionIds.length - 1]; + if (state === GET_STATES.LATEST) { - assertionId = await this.blockchainModuleManager.getUnfinalizedAssertionId( - blockchain, - tokenId, - ); - } - if (assertionId == null || assertionId === ZERO_BYTES32) { - assertionId = await this.blockchainModuleManager.getLatestAssertionId( + let unfinalizedAssertionId; + unfinalizedAssertionId = await this.pendingStorageService.getPendingState( + PENDING_STORAGE_REPOSITORIES.PUBLIC, blockchain, contract, tokenId, ); + if (!unfinalizedAssertionId) { + unfinalizedAssertionId = + await this.blockchainModuleManager.getUnfinalizedAssertionId( + blockchain, + tokenId, + ); + } + if (unfinalizedAssertionId !== ZERO_BYTES32) { + const updateCommitWindowOpen = await this.isUpdateCommitWindowOpen( + blockchain, + contract, + tokenId, + hashFunctionId, + assertionIds, + ); + if (updateCommitWindowOpen) { + assertionId = unfinalizedAssertionId; + this.logger.warn( + `Commit update window open for tokenId: ${tokenId}, using unfinalized assertion id: ${assertionId} for operation id: ${operationId}`, + ); + } else { + assertionId = latestFinalizedAssertionId; + this.logger.warn( + `Commit update window closed for tokenId: ${tokenId}, latest assertion id will be used instead of unfinalized for operation id: ${operationId}`, + ); + } + } + } + if (assertionId === null || assertionId === ZERO_BYTES32 || assertionId === undefined) { + assertionId = latestFinalizedAssertionId; } } - + this.logger.info( + `Found assertion id: ${assertionId} for token id: ${tokenId}, contract: ${contract} on blockchain: ${blockchain} for operation id: ${operationId}`, + ); return this.continueSequence({ ...command.data, state, assertionId }, command.sequence); } + async isUpdateCommitWindowOpen(blockchain, contract, tokenId, hashFunctionId, assertionIds) { + const keyword = await this.ualService.calculateLocationKeyword( + blockchain, + contract, + tokenId, + assertionIds[0], + ); + + const agreementId = this.serviceAgreementService.generateId( + blockchain, + contract, + tokenId, + keyword, + hashFunctionId, + ); + const latestStateIndex = assertionIds.length; + + let agreementData; + agreementData = await this.repositoryModuleManager.getServiceAgreementRecord(agreementId); + if (!agreementData) { + agreementData = await this.blockchainModuleManager.getAgreementData( + blockchain, + agreementId, + ); + } + + const epoch = await this.serviceAgreementService.calculateCurrentEpoch( + agreementData.startTime, + agreementData.epochLength, + blockchain, + ); + + return this.blockchainModuleManager.isUpdateCommitWindowOpen( + blockchain, + agreementId, + epoch, + latestStateIndex, + ); + } + async handleError(operationId, blockchain, errorMessage, errorType) { await this.operationService.markOperationAsFailed( operationId, diff --git a/src/commands/protocols/get/sender/get-schedule-messages-command.js b/src/commands/protocols/get/sender/get-schedule-messages-command.js index ece739265c..fdcaa750be 100644 --- a/src/commands/protocols/get/sender/get-schedule-messages-command.js +++ b/src/commands/protocols/get/sender/get-schedule-messages-command.js @@ -11,7 +11,11 @@ class GetScheduleMessagesCommand extends ProtocolScheduleMessagesCommand { } getNextCommandData(command) { - return { ...super.getNextCommandData(command), state: command.data.state }; + return { + ...super.getNextCommandData(command), + state: command.data.state, + assetSync: command.data.assetSync, + }; } /** diff --git a/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js b/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js index c7e7210694..bda74a7bb0 100644 --- a/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js +++ b/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js @@ -5,6 +5,7 @@ import { OPERATION_ID_STATUS, ERROR_TYPE, TRIPLE_STORE_REPOSITORIES, + SERVICE_AGREEMENT_SOURCES, } from '../../../../../constants/constants.js'; class HandleStoreRequestCommand extends HandleProtocolMessageCommand { @@ -84,6 +85,7 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { keyword, assertionId, stateIndex, + SERVICE_AGREEMENT_SOURCES.NODE, ); await this.operationIdService.updateOperationIdStatus( diff --git a/src/commands/protocols/publish/sender/publish-schedule-messages-command.js b/src/commands/protocols/publish/sender/publish-schedule-messages-command.js index e8bec24443..1013099ed7 100644 --- a/src/commands/protocols/publish/sender/publish-schedule-messages-command.js +++ b/src/commands/protocols/publish/sender/publish-schedule-messages-command.js @@ -59,38 +59,32 @@ class PublishScheduleMessagesCommand extends ProtocolScheduleMessagesCommand { minAckResponses, operationId, ) { - const agreementId = await this.serviceAgreementService.generateId( + const blockchainAssertionSize = await this.blockchainModuleManager.getAssertionSize( + blockchain, + assertionId, + ); + + const agreementId = this.serviceAgreementService.generateId( blockchain, contract, tokenId, keyword, hashFunctionId, ); - const agreementData = await this.blockchainModuleManager.getAgreementData( blockchain, agreementId, ); - const r0 = await this.blockchainModuleManager.getR0(blockchain); + const r0 = await this.blockchainModuleManager.getR0(); - const blockchainAssertionSize = await this.blockchainModuleManager.getAssertionSize( + const serviceAgreementBid = await this.serviceAgreementService.calculateBid( blockchain, - assertionId, + blockchainAssertionSize, + agreementData, + r0, ); - const divisor = this.blockchainModuleManager - .toBigNumber(blockchain, r0) - .mul(Number(agreementData.epochsNumber)) - .mul(blockchainAssertionSize); - - const serviceAgreementBid = this.blockchainModuleManager - .toBigNumber(blockchain, agreementData.tokenAmount) - .add(agreementData.updateTokenAmount) - .mul(1024) - .div(divisor) - .add(1); // add 1 wei because of the precision loss - let validBids = 0; await Promise.all( @@ -116,9 +110,8 @@ class PublishScheduleMessagesCommand extends ProtocolScheduleMessagesCommand { async getAsk(blockchain, nodeId) { const peerRecord = await this.repositoryModuleManager.getPeerRecord(nodeId, blockchain); - const ask = this.blockchainModuleManager.convertToWei(blockchain, peerRecord.ask); - return this.blockchainModuleManager.toBigNumber(blockchain, ask); + return this.blockchainModuleManager.convertToWei(blockchain, peerRecord.ask); } /** diff --git a/src/commands/protocols/update/receiver/delete-pending-state-command.js b/src/commands/protocols/update/receiver/delete-pending-state-command.js index defa18d8d2..4edf754a55 100644 --- a/src/commands/protocols/update/receiver/delete-pending-state-command.js +++ b/src/commands/protocols/update/receiver/delete-pending-state-command.js @@ -1,5 +1,9 @@ import Command from '../../../command.js'; -import { ERROR_TYPE, PENDING_STORAGE_REPOSITORIES } from '../../../../constants/constants.js'; +import { + ERROR_TYPE, + PENDING_STORAGE_REPOSITORIES, + TRIPLE_STORE_REPOSITORIES, +} from '../../../../constants/constants.js'; class DeletePendingStateCommand extends Command { constructor(ctx) { @@ -11,55 +15,103 @@ class DeletePendingStateCommand extends Command { } async execute(command) { - const { blockchain, contract, tokenId, assertionId, operationId } = command.data; + const { blockchain, contract, tokenId, assertionId, operationId, keyword, hashFunctionId } = + command.data; this.logger.trace( `Started ${command.name} for blockchain: ${blockchain} contract: ${contract}, ` + `token id: ${tokenId}, assertion id: ${assertionId}`, ); - const assetStates = await this.blockchainModuleManager.getAssertionIds( + const pendingStateExists = await this.pendingStateExists( blockchain, contract, tokenId, + assertionId, ); - if (assetStates.includes(assertionId)) { + if (pendingStateExists) { this.logger.trace( - `Not clearing the pending storage as state was finalized and clearing is triggered by StateFinalized event.`, + `Pending state exists for token id: ${tokenId}, assertion id: ${assertionId}, blockchain: ${blockchain} and operationId: ${operationId}`, ); - return Command.empty(); + const assetStates = await this.blockchainModuleManager.getAssertionIds( + blockchain, + contract, + tokenId, + ); + if (assetStates.includes(assertionId)) { + const stateIndex = assetStates.indexOf(assertionId); + this.logger.trace( + `Node missed state finalized event for token id: ${tokenId}, assertion id: ${assertionId}, blockchain: ${blockchain} and operationId: ${operationId}. Node will now move data from pending storage to triple store`, + ); + await Promise.all([ + this.pendingStorageService.moveAndDeletePendingState( + TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, + TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY, + PENDING_STORAGE_REPOSITORIES.PUBLIC, + blockchain, + contract, + tokenId, + keyword, + hashFunctionId, + assertionId, + stateIndex, + ), + this.pendingStorageService.moveAndDeletePendingState( + TRIPLE_STORE_REPOSITORIES.PRIVATE_CURRENT, + TRIPLE_STORE_REPOSITORIES.PRIVATE_HISTORY, + PENDING_STORAGE_REPOSITORIES.PRIVATE, + blockchain, + contract, + tokenId, + keyword, + hashFunctionId, + assertionId, + stateIndex, + ), + ]); + } + await this.deletePendingState(blockchain, contract, tokenId, assertionId, operationId); } + return Command.empty(); + } + async deletePendingState(blockchain, contract, tokenId, assertionId, operationId) { for (const repository of [ PENDING_STORAGE_REPOSITORIES.PUBLIC, PENDING_STORAGE_REPOSITORIES.PRIVATE, ]) { // eslint-disable-next-line no-await-in-loop - const pendingStateExists = await this.pendingStorageService.assetHasPendingState( + await this.pendingStorageService.removeCachedAssertion( repository, blockchain, contract, tokenId, assertionId, + operationId, ); + } + } - if (!pendingStateExists) { - continue; - } - + async pendingStateExists(blockchain, contract, tokenId, assertionId) { + for (const repository of [ + PENDING_STORAGE_REPOSITORIES.PUBLIC, + PENDING_STORAGE_REPOSITORIES.PRIVATE, + ]) { // eslint-disable-next-line no-await-in-loop - await this.pendingStorageService.removeCachedAssertion( + const pendingStateExists = await this.pendingStorageService.assetHasPendingState( repository, blockchain, contract, tokenId, assertionId, - operationId, ); - } - return Command.empty(); + if (pendingStateExists) { + return true; + } + } + return false; } /** diff --git a/src/commands/protocols/update/receiver/submit-update-commit-command.js b/src/commands/protocols/update/receiver/submit-update-commit-command.js index 3ad8effde6..21678393b3 100644 --- a/src/commands/protocols/update/receiver/submit-update-commit-command.js +++ b/src/commands/protocols/update/receiver/submit-update-commit-command.js @@ -4,6 +4,7 @@ import { ERROR_TYPE, COMMAND_RETRIES, COMMAND_TX_GAS_INCREASE_FACTORS, + CONTRACT_FUNCTION_FIXED_GAS_PRICE, } from '../../../../constants/constants.js'; class SubmitUpdateCommitCommand extends Command { @@ -12,6 +13,7 @@ class SubmitUpdateCommitCommand extends Command { this.commandExecutor = ctx.commandExecutor; this.blockchainModuleManager = ctx.blockchainModuleManager; this.operationIdService = ctx.operationIdService; + this.serviceAgreementService = ctx.serviceAgreementService; this.errorType = ERROR_TYPE.COMMIT_PROOF.SUBMIT_UPDATE_COMMIT_ERROR; } @@ -41,9 +43,9 @@ class SubmitUpdateCommitCommand extends Command { `Retry number: ${COMMAND_RETRIES.SUBMIT_UPDATE_COMMIT - command.retries + 1}`, ); - const epoch = await this.calculateCurrentEpoch( - Number(agreementData.startTime), - Number(agreementData.epochLength), + const epoch = await this.serviceAgreementService.calculateCurrentEpoch( + agreementData.startTime, + agreementData.epochLength, blockchain, ); @@ -74,8 +76,10 @@ class SubmitUpdateCommitCommand extends Command { return Command.empty(); } - - const txGasPrice = gasPrice ?? (await this.blockchainModuleManager.getGasPrice(blockchain)); + const txGasPrice = + gasPrice ?? + CONTRACT_FUNCTION_FIXED_GAS_PRICE[blockchain]?.SUBMIT_UPDATE_COMMIT ?? + (await this.blockchainModuleManager.getGasPrice(blockchain)); const transactionCompletePromise = new Promise((resolve, reject) => { this.blockchainModuleManager.submitUpdateCommit( @@ -176,11 +180,6 @@ class SubmitUpdateCommitCommand extends Command { return Command.empty(); } - async calculateCurrentEpoch(startTime, epochLength, blockchain) { - const now = await this.blockchainModuleManager.getBlockchainTimestamp(blockchain); - return Math.floor((Number(now) - Number(startTime)) / Number(epochLength)); - } - async retryFinished(command) { const { blockchain, operationId } = command.data; await this.handleError( diff --git a/src/constants/constants.js b/src/constants/constants.js index acb07b3772..6d46ad577e 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -65,13 +65,15 @@ export const TRIPLE_STORE_CONNECT_MAX_RETRIES = 10; export const DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS = 15 * 24 * 60 * 60 * 1000; // 15 days +export const MAX_BLOCKCHAIN_EVENT_SYNC_OF_HISTORICAL_BLOCKS_IN_MILLS = 60 * 60 * 1000; // 1 hour + export const MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH = 50; export const TRANSACTION_QUEUE_CONCURRENCY = 1; export const TRIPLE_STORE_CONNECT_RETRY_FREQUENCY = 10; -export const MAX_FILE_SIZE = 2621440; +export const MAX_FILE_SIZE = 10000000; export const GET_STATES = { LATEST: 'LATEST', FINALIZED: 'LATEST_FINALIZED' }; @@ -187,23 +189,60 @@ export const DEFAULT_COMMAND_REPEAT_INTERVAL_IN_MILLS = 5000; // 5 seconds export const DEFAULT_COMMAND_DELAY_IN_MILLS = 60 * 1000; // 60 seconds +export const TRANSACTION_PRIORITY = { + HIGH: 1, + REGULAR: 2, +}; + +export const CONTRACT_FUNCTION_PRIORITY = { + 'submitCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))': + TRANSACTION_PRIORITY.REGULAR, + 'submitCommit((address,uint256,bytes,uint8,uint16))': TRANSACTION_PRIORITY.REGULAR, + 'submitUpdateCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))': + TRANSACTION_PRIORITY.HIGH, + 'submitUpdateCommit((address,uint256,bytes,uint8,uint16))': TRANSACTION_PRIORITY.HIGH, + sendProof: TRANSACTION_PRIORITY.REGULAR, +}; + export const COMMAND_RETRIES = { + SIMPLE_ASSET_SYNC: 1, SUBMIT_COMMIT: 5, SUBMIT_UPDATE_COMMIT: 5, SUBMIT_PROOFS: 5, }; +export const SIMPLE_ASSET_SYNC_PARAMETERS = { + GET_RESULT_POLLING_INTERVAL_MILLIS: 1 * 1000, + GET_RESULT_POLLING_MAX_ATTEMPTS: 30, +}; + export const COMMAND_TX_GAS_INCREASE_FACTORS = { SUBMIT_COMMIT: 1.2, SUBMIT_UPDATE_COMMIT: 1.2, SUBMIT_PROOFS: 1.2, }; +export const CONTRACT_FUNCTION_GAS_LIMIT_INCREASE_FACTORS = { + 'submitUpdateCommit((address,uint256,bytes,uint8,uint16,uint72,uint72,uint72))': 1.2, + 'submitUpdateCommit((address,uint256,bytes,uint8,uint16))': 1.2, +}; + export const GNOSIS_DEFAULT_GAS_PRICE = { TESTNET: 25, MAINNET: 5, }; +export const NEURO_DEFAULT_GAS_PRICE = { + TESTNET: 8, + MAINNET: 8, +}; + +export const CONTRACT_FUNCTION_FIXED_GAS_PRICE = { + 'otp:2043': { + SUBMIT_UPDATE_COMMIT: 15, + }, +}; + export const WEBSOCKET_PROVIDER_OPTIONS = { reconnect: { auto: true, @@ -298,6 +337,7 @@ export const ERROR_TYPE = { COMMIT_PROOF: { CALCULATE_PROOFS_ERROR: 'CalculateProofsError', EPOCH_CHECK_ERROR: 'EpochCheckError', + SIMPLE_ASSET_SYNC_ERROR: 'SimpleAssetSyncError', SUBMIT_COMMIT_ERROR: 'SubmitCommitError', SUBMIT_COMMIT_SEND_TX_ERROR: 'SubmitCommitSendTxError', SUBMIT_PROOFS_ERROR: 'SubmitProofsError', @@ -361,6 +401,8 @@ export const OPERATION_ID_STATUS = { COMMIT_PROOF: { EPOCH_CHECK_START: 'EPOCH_CHECK_START', EPOCH_CHECK_END: 'EPOCH_CHECK_END', + SIMPLE_ASSET_SYNC_START: 'SIMPLE_ASSET_SYNC_START', + SIMPLE_ASSET_SYNC_END: 'SIMPLE_ASSET_SYNC_END', SUBMIT_COMMIT_START: 'SUBMIT_COMMIT_START', SUBMIT_COMMIT_END: 'SUBMIT_COMMIT_END', SUBMIT_COMMIT_SEND_TX_START: 'SUBMIT_COMMIT_SEND_TX_START', @@ -396,6 +438,8 @@ export const OPERATIONS = { GET: 'get', }; +export const SERVICE_AGREEMENT_START_TIME_DELAY_FOR_COMMITS_SECONDS = 5 * 60; + /** * @constant {number} OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS - * operation id command cleanup interval time 24h @@ -577,12 +621,12 @@ export const CONTRACTS = { STAKING_CONTRACT: 'StakingContract', PROFILE_CONTRACT: 'ProfileContract', HUB_CONTRACT: 'HubContract', - // TODO: Update with new commit Managers + CONTENT_ASSET: 'ContentAssetContract', COMMIT_MANAGER_V1_U1_CONTRACT: 'CommitManagerV1U1Contract', SERVICE_AGREEMENT_V1_CONTRACT: 'ServiceAgreementV1Contract', PARAMETERS_STORAGE_CONTRACT: 'ParametersStorageContract', IDENTITY_STORAGE_CONTRACT: 'IdentityStorageContract', - Log2PLDSF_CONTRACT: 'Log2PLDSFContract', + LOG2PLDSF_CONTRACT: 'Log2PLDSFContract', LINEAR_SUM_CONTRACT: 'LinearSumContract', }; @@ -591,13 +635,35 @@ export const CONTRACT_EVENTS = { SHARDING_TABLE: ['NodeAdded', 'NodeRemoved'], STAKING: ['StakeIncreased', 'StakeWithdrawalStarted'], PROFILE: ['AskUpdated'], + CONTENT_ASSET: ['AssetMinted'], COMMIT_MANAGER_V1: ['StateFinalized'], - SERVICE_AGREEMENT_V1: ['ServiceAgreementV1Extended', 'ServiceAgreementV1Terminated'], + SERVICE_AGREEMENT_V1: [ + 'ServiceAgreementV1Created', + 'ServiceAgreementV1Extended', + 'ServiceAgreementV1Terminated', + ], PARAMETERS_STORAGE: ['ParameterChanged'], - Log2PLDSF: ['ParameterChanged'], + LOG2PLDSF: ['ParameterChanged'], LINEAR_SUM: ['ParameterChanged'], }; +export const GROUPED_CONTRACT_EVENTS = { + AssetCreatedGroup: { + events: ['AssetMinted', 'ServiceAgreementV1Created'], + groupingKey: 'tokenId', + }, +}; + +export const CONTRACT_EVENT_TO_GROUP_MAPPING = (() => { + const mapping = {}; + Object.entries(GROUPED_CONTRACT_EVENTS).forEach(([groupName, { events }]) => { + events.forEach((eventName) => { + mapping[eventName] = groupName; + }); + }); + return mapping; +})(); + export const NODE_ENVIRONMENTS = { DEVELOPMENT: 'development', TEST: 'test', @@ -624,6 +690,13 @@ export const BLOCK_TIME_MILLIS = { export const TRANSACTION_CONFIRMATIONS = 1; +export const SERVICE_AGREEMENT_SOURCES = { + BLOCKCHAIN: 'blockchain', + EVENT: 'event', + CLIENT: 'client', + NODE: 'node', +}; + export const CACHE_DATA_TYPES = { NUMBER: 'number', ANY: 'any', @@ -648,6 +721,8 @@ export const CACHED_FUNCTIONS = { epochLength: CACHE_DATA_TYPES.NUMBER, minimumStake: CACHE_DATA_TYPES.ANY, maximumStake: CACHE_DATA_TYPES.ANY, + minProofWindowOffsetPerc: CACHE_DATA_TYPES.NUMBER, + maxProofWindowOffsetPerc: CACHE_DATA_TYPES.NUMBER, }, IdentityStorageContract: { getIdentityId: CACHE_DATA_TYPES.NUMBER, diff --git a/src/migration/service-agreements-data-inspector.js b/src/migration/service-agreements-data-inspector.js index 1f8bca1c75..aad52f2fe8 100644 --- a/src/migration/service-agreements-data-inspector.js +++ b/src/migration/service-agreements-data-inspector.js @@ -176,7 +176,7 @@ class ServiceAgreementsDataInspector extends BaseMigration { isInvalid = true; } - const agreementId = await this.serviceAgreementService.generateId( + const agreementId = this.serviceAgreementService.generateId( serviceAgreement.blockchainId, serviceAgreement.assetStorageContractAddress, serviceAgreement.tokenId, diff --git a/src/migration/service-agreements-metadata-migration.js b/src/migration/service-agreements-metadata-migration.js index 8c29a8771c..382c2eaf94 100644 --- a/src/migration/service-agreements-metadata-migration.js +++ b/src/migration/service-agreements-metadata-migration.js @@ -6,6 +6,7 @@ import { CONTENT_ASSET_HASH_FUNCTION_ID, SCHEMA_CONTEXT, TRIPLE_STORE_REPOSITORIES, + SERVICE_AGREEMENT_SOURCES, } from '../constants/constants.js'; class ServiceAgreementsMetadataMigration extends BaseMigration { @@ -131,7 +132,7 @@ class ServiceAgreementsMetadataMigration extends BaseMigration { ); // generate agreement id - const agreementId = await this.serviceAgreementService.generateId( + const agreementId = this.serviceAgreementService.generateId( blockchain, contract, tokenId, @@ -223,6 +224,7 @@ class ServiceAgreementsMetadataMigration extends BaseMigration { keyword, assertionId, stateIndex, + SERVICE_AGREEMENT_SOURCES.BLOCKCHAIN, lastCommitEpoch, lastProofEpoch, ); diff --git a/src/modules/blockchain/blockchain-module-manager.js b/src/modules/blockchain/blockchain-module-manager.js index 202a99694e..eccb49a396 100644 --- a/src/modules/blockchain/blockchain-module-manager.js +++ b/src/modules/blockchain/blockchain-module-manager.js @@ -82,6 +82,23 @@ class BlockchainModuleManager extends BaseModuleManager { return this.callImplementationFunction(blockchain, 'restartService'); } + async getMinProofWindowOffsetPerc(blockchain) { + return this.callImplementationFunction(blockchain, 'getMinProofWindowOffsetPerc'); + } + + async getMaxProofWindowOffsetPerc(blockchain) { + return this.callImplementationFunction(blockchain, 'getMaxProofWindowOffsetPerc'); + } + + async generatePseudorandomUint8(blockchain, assetCreator, blockNumber, blockTimestamp, limit) { + return this.callImplementationFunction(blockchain, 'generatePseudorandomUint8', [ + assetCreator, + blockNumber, + blockTimestamp, + limit, + ]); + } + async getAssertionIdByIndex(blockchain, assetContractAddress, tokenId, index) { return this.callImplementationFunction(blockchain, 'getAssertionIdByIndex', [ assetContractAddress, @@ -140,6 +157,10 @@ class BlockchainModuleManager extends BaseModuleManager { ]); } + async getTransaction(blockchain, transactionHash) { + return this.callImplementationFunction(blockchain, 'getTransaction', [transactionHash]); + } + async getAllPastEvents( blockchain, contractName, @@ -388,6 +409,10 @@ class BlockchainModuleManager extends BaseModuleManager { return this.callImplementationFunction(blockchain, 'isHashFunction', [hashFunctionId]); } + getScoreFunctionIds(blockchain) { + return this.callImplementationFunction(blockchain, 'getScoreFunctionIds'); + } + async getLog2PLDSFParams(blockchain) { return this.callImplementationFunction(blockchain, 'getLog2PLDSFParams'); } diff --git a/src/modules/blockchain/implementation/gnosis/gnosis-service.js b/src/modules/blockchain/implementation/gnosis/gnosis-service.js index e1353b8236..431ef46466 100644 --- a/src/modules/blockchain/implementation/gnosis/gnosis-service.js +++ b/src/modules/blockchain/implementation/gnosis/gnosis-service.js @@ -19,26 +19,34 @@ class GnosisService extends Web3Service { } async getGasPrice() { + let gasPrice; try { const response = await axios.get(this.config.gasPriceOracleLink); - let gasPrice; - if (this.config.name.split(':')[1] === '100') { + if (response?.data?.average) { + // Returnts gwei + gasPrice = Number(response.data.average); + this.logger.debug(`Gas price from Gnosis oracle link: ${gasPrice} gwei`); + } else if (response?.data?.result) { + // Returns wei gasPrice = Number(response.data.result, 10); - } else if (this.config.name.split(':')[1] === '10200') { - gasPrice = Math.round(response.data.average * 1e9); + this.logger.debug(`Gas price from Gnosis oracle link: ${gasPrice} wei`); + return gasPrice; + } else { + throw Error( + `Gas price oracle: ${this.config.gasPriceOracleLink} returns gas price in unsupported format.`, + ); } - this.logger.debug(`Gas price on Gnosis: ${gasPrice}`); - return gasPrice; } catch (error) { const defaultGasPrice = - process.env.NODE_ENV === NODE_ENVIRONMENTS.MAINNET + process.NODE_ENV === NODE_ENVIRONMENTS.MAINNET ? GNOSIS_DEFAULT_GAS_PRICE.MAINNET : GNOSIS_DEFAULT_GAS_PRICE.TESTNET; this.logger.warn( `Failed to fetch the gas price from the Gnosis: ${error}. Using default value: ${defaultGasPrice} Gwei.`, ); - this.convertToWei(defaultGasPrice, 'gwei'); + gasPrice = defaultGasPrice; } + return this.convertToWei(gasPrice, 'gwei'); } async healthCheck() { diff --git a/src/modules/blockchain/implementation/hardhat/hardhat-service.js b/src/modules/blockchain/implementation/hardhat/hardhat-service.js index 603a1c630c..e52efd54bb 100644 --- a/src/modules/blockchain/implementation/hardhat/hardhat-service.js +++ b/src/modules/blockchain/implementation/hardhat/hardhat-service.js @@ -20,6 +20,10 @@ class HardhatService extends Web3Service { async providerReady() { return this.provider.ready; } + + async getGasPrice() { + return this.convertToWei(20, 'wei'); + } } export default HardhatService; diff --git a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js index 348b4e19e9..34f64bb5b6 100644 --- a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js +++ b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js @@ -1,6 +1,10 @@ import { ApiPromise, WsProvider, HttpProvider } from '@polkadot/api'; import { ethers } from 'ethers'; -import { BLOCK_TIME_MILLIS } from '../../../../constants/constants.js'; +import { + BLOCK_TIME_MILLIS, + NEURO_DEFAULT_GAS_PRICE, + NODE_ENVIRONMENTS, +} from '../../../../constants/constants.js'; import Web3Service from '../web3-service.js'; const NATIVE_TOKEN_DECIMALS = 12; @@ -127,7 +131,11 @@ class OtParachainService extends Web3Service { try { return this.provider.getGasPrice(); } catch (error) { - return undefined; + const defaultGasPrice = + process.env.NODE_ENV === NODE_ENVIRONMENTS.MAINNET + ? NEURO_DEFAULT_GAS_PRICE.MAINNET + : NEURO_DEFAULT_GAS_PRICE.TESTNET; + return this.convertToWei(defaultGasPrice, 'wei'); } } diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index 557987ce66..583fd3aef6 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -9,7 +9,6 @@ import { SOLIDITY_PANIC_CODE_PREFIX, SOLIDITY_PANIC_REASONS, ZERO_PREFIX, - DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS, MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH, TRANSACTION_QUEUE_CONCURRENCY, TRANSACTION_POLLING_TIMEOUT_MILLIS, @@ -22,11 +21,16 @@ import { CACHED_FUNCTIONS, CACHE_DATA_TYPES, CONTRACTS, + CONTRACT_FUNCTION_PRIORITY, + TRANSACTION_PRIORITY, + CONTRACT_FUNCTION_GAS_LIMIT_INCREASE_FACTORS, + MAX_BLOCKCHAIN_EVENT_SYNC_OF_HISTORICAL_BLOCKS_IN_MILLS, } from '../../../constants/constants.js'; const require = createRequire(import.meta.url); const ABIs = { + ContentAsset: require('dkg-evm-module/abi/ContentAsset.json'), ContentAssetStorage: require('dkg-evm-module/abi/ContentAssetStorage.json'), AssertionStorage: require('dkg-evm-module/abi/AssertionStorage.json'), Staking: require('dkg-evm-module/abi/Staking.json'), @@ -95,16 +99,33 @@ class Web3Service { queueTransaction(contractInstance, functionName, transactionArgs, callback, gasPrice) { const selectedQueue = this.selectTransactionQueue(); - - selectedQueue.push( - { - contractInstance, - functionName, - transactionArgs, - gasPrice, - }, - callback, - ); + const priority = CONTRACT_FUNCTION_PRIORITY[functionName] ?? TRANSACTION_PRIORITY.REGULAR; + this.logger.info(`Calling ${functionName} with priority: ${priority}`); + switch (priority) { + case TRANSACTION_PRIORITY.HIGH: + selectedQueue.unshift( + { + contractInstance, + functionName, + transactionArgs, + gasPrice, + }, + callback, + ); + break; + case TRANSACTION_PRIORITY.REGULAR: + default: + selectedQueue.push( + { + contractInstance, + functionName, + transactionArgs, + gasPrice, + }, + callback, + ); + break; + } } removeTransactionQueue(walletAddress) { @@ -596,8 +617,7 @@ class Web3Service { operationalWallet, ) { let result; - const gasPrice = - predefinedGasPrice ?? (await this.getGasPrice()) ?? this.convertToWei(20, 'gwei'); + const gasPrice = predefinedGasPrice ?? (await this.getGasPrice()); let gasLimit; try { @@ -609,6 +629,10 @@ class Web3Service { gasLimit = gasLimit ?? this.convertToWei(900, 'kwei'); + const gasLimitMultiplier = CONTRACT_FUNCTION_GAS_LIMIT_INCREASE_FACTORS[functionName] ?? 1; + + gasLimit = gasLimit.mul(gasLimitMultiplier * 100).div(100); + this.logger.debug( `Sending signed transaction ${functionName} to the blockchain ${this.getBlockchainId()}` + ` with gas limit: ${gasLimit.toString()} and gasPrice ${gasPrice.toString()}. ` + @@ -866,6 +890,10 @@ class Web3Service { return value.toString(); } + async getTransaction(transactionHash) { + return this.provider.getTransaction(transactionHash); + } + async getAllPastEvents( blockchainId, contractName, @@ -879,12 +907,18 @@ class Web3Service { // this will happen when we have different set of contracts on different blockchains // eg LinearSum contract is available on gnosis but not on NeuroWeb, so the node should not fetch events // from LinearSum contract on NeuroWeb blockchain - return []; + return { + events: [], + lastCheckedBlock: currentBlock, + eventsMissed: false, + }; } let fromBlock; - if (this.isOlderThan(lastCheckedTimestamp, DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS)) { + let eventsMissed = false; + if (this.startBlock - lastCheckedBlock > this.getMaxNumberOfHistoricalBlocksForSync()) { fromBlock = this.startBlock; + eventsMissed = true; } else { fromBlock = lastCheckedBlock + 1; } @@ -897,30 +931,55 @@ class Web3Service { } const events = []; - while (fromBlock <= currentBlock) { - const toBlock = Math.min( - fromBlock + MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH - 1, - currentBlock, + let toBlock = currentBlock; + try { + while (fromBlock <= currentBlock) { + toBlock = Math.min( + fromBlock + MAXIMUM_NUMBERS_OF_BLOCKS_TO_FETCH - 1, + currentBlock, + ); + const newEvents = await this.processBlockRange( + fromBlock, + toBlock, + contract, + topics, + ); + newEvents.forEach((e) => events.push(...e)); + fromBlock = toBlock + 1; + } + } catch (error) { + this.logger.warn( + `Unable to process block range from: ${fromBlock} to: ${toBlock} for contract ${contractName} on blockchain: ${blockchainId}. Error: ${error.message}`, ); - const newEvents = await this.processBlockRange(fromBlock, toBlock, contract, topics); - newEvents.forEach((e) => events.push(...e)); - fromBlock = toBlock + 1; } - return events.map((event) => ({ - contract: contractName, - event: event.event, - data: JSON.stringify( - Object.fromEntries( - Object.entries(event.args).map(([k, v]) => [ - k, - ethers.BigNumber.isBigNumber(v) ? v.toString() : v, - ]), + return { + events: events.map((event) => ({ + contract: contractName, + event: event.event, + data: JSON.stringify( + Object.fromEntries( + Object.entries(event.args).map(([k, v]) => [ + k, + ethers.BigNumber.isBigNumber(v) ? v.toString() : v, + ]), + ), ), - ), - block: event.blockNumber, - blockchainId, - })); + block: event.blockNumber, + blockchainId, + })), + lastCheckedBlock: toBlock, + eventsMissed, + }; + } + + getMaxNumberOfHistoricalBlocksForSync() { + if (!this.maxNumberOfHistoricalBlocksForSync) { + this.maxNumberOfHistoricalBlocksForSync = Math.round( + MAX_BLOCKCHAIN_EVENT_SYNC_OF_HISTORICAL_BLOCKS_IN_MILLS / this.getBlockTimeMillis(), + ); + } + return this.maxNumberOfHistoricalBlocksForSync; } async processBlockRange(fromBlock, toBlock, contract, topics) { @@ -942,6 +1001,36 @@ class Web3Service { ]); } + async getMinProofWindowOffsetPerc() { + return this.callContractFunction( + this.ParametersStorageContract, + 'minProofWindowOffsetPerc', + [], + CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + ); + } + + async getMaxProofWindowOffsetPerc() { + return this.callContractFunction( + this.ParametersStorageContract, + 'maxProofWindowOffsetPerc', + [], + CONTRACTS.PARAMETERS_STORAGE_CONTRACT, + ); + } + + async generatePseudorandomUint8(assetCreator, blockNumber, blockTimestamp, limit) { + const encodedData = ethers.utils.encodePacked( + ['uint256', 'address', 'uint256'], + [blockTimestamp, assetCreator, blockNumber], + ); + const hash = ethers.utils.keccak256(encodedData); + const hashBigNumber = BigNumber.from(hash); + const hashModulo = hashBigNumber.mod(limit); + + return hashModulo.mod(256); + } + async getAssertionIdByIndex(assetContractAddress, tokenId, index) { const assetStorageContractInstance = this.assetStorageContracts[assetContractAddress.toLowerCase()]; @@ -1335,7 +1424,7 @@ class Web3Service { } convertToWei(value, fromUnit = 'ether') { - return ethers.utils.parseUnits(value.toString(), fromUnit).toString(); + return ethers.utils.parseUnits(value.toString(), fromUnit); } convertFromWei(value, toUnit = 'ether') { @@ -1403,12 +1492,16 @@ class Web3Service { ]); } + getScoreFunctionIds() { + return Object.keys(this.scoringFunctionsContracts); + } + async getLog2PLDSFParams() { const log2pldsfParams = await this.callContractFunction( this.scoringFunctionsContracts[1], 'getParameters', [], - CONTRACTS.Log2PLDSF_CONTRACT, + CONTRACTS.LOG2PLDSF_CONTRACT, ); const params = {}; diff --git a/src/modules/repository/implementation/sequelize/migrations/20240221162000-add-service-agreement-data-source.js b/src/modules/repository/implementation/sequelize/migrations/20240221162000-add-service-agreement-data-source.js new file mode 100644 index 0000000000..be3c575c89 --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20240221162000-add-service-agreement-data-source.js @@ -0,0 +1,11 @@ +import { SERVICE_AGREEMENT_SOURCES } from '../../../../../constants/constants.js'; + +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.addColumn('service_agreement', 'data_source', { + type: Sequelize.ENUM(...Object.values(SERVICE_AGREEMENT_SOURCES)), + }); +} + +export async function down({ context: { queryInterface } }) { + await queryInterface.removeColumn('service_agreement', 'data_source'); +} diff --git a/src/modules/repository/implementation/sequelize/models/service-agreement.js b/src/modules/repository/implementation/sequelize/models/service-agreement.js index e0bc30fd9f..b00bb300fd 100644 --- a/src/modules/repository/implementation/sequelize/models/service-agreement.js +++ b/src/modules/repository/implementation/sequelize/models/service-agreement.js @@ -1,3 +1,5 @@ +import { SERVICE_AGREEMENT_SOURCES } from '../../../../../constants/constants.js'; + export default (sequelize, DataTypes) => { const serviceAgreement = sequelize.define( 'service_agreement', @@ -54,6 +56,9 @@ export default (sequelize, DataTypes) => { type: DataTypes.TINYINT.UNSIGNED, allowNull: false, }, + dataSource: { + type: DataTypes.ENUM(...Object.values(SERVICE_AGREEMENT_SOURCES)), + }, lastCommitEpoch: { type: DataTypes.SMALLINT.UNSIGNED, }, diff --git a/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js b/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js index f19fe1530f..92f4edf035 100644 --- a/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/blockchain-event-repository.js @@ -58,7 +58,7 @@ class BlockchainEventRepository { } async markBlockchainEventsAsProcessed(events) { - const idsForUpdate = events.map((event) => event.id); + const idsForUpdate = events.flatMap((event) => event.id); return this.model.update( { processed: true }, { diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index 1e8124f3cf..5682b06970 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -43,6 +43,7 @@ class ServiceAgreementRepository { keyword, assertionId, stateIndex, + dataSource, lastCommitEpoch, lastProofEpoch, ) { @@ -60,6 +61,7 @@ class ServiceAgreementRepository { keyword, assertionId, stateIndex, + dataSource, lastCommitEpoch, lastProofEpoch, }); @@ -127,7 +129,13 @@ class ServiceAgreementRepository { }); } - getEligibleAgreementsForSubmitCommit(timestampSeconds, blockchain, commitWindowDurationPerc) { + getEligibleAgreementsForSubmitCommit( + timestampSeconds, + blockchain, + commitWindowDurationPerc, + startTimeDelay, + ) { + const cutoffTimestamp = timestampSeconds - startTimeDelay; const currentEpoch = `FLOOR((${timestampSeconds} - start_time) / epoch_length)`; const currentEpochPerc = `((${timestampSeconds} - start_time) % epoch_length) / epoch_length * 100`; @@ -146,6 +154,9 @@ class ServiceAgreementRepository { }, where: { blockchainId: blockchain, + start_time: { + [Sequelize.Op.lt]: cutoffTimestamp, + }, [Sequelize.Op.or]: [ { lastCommitEpoch: { diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 240f0cf64e..b510558bd9 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -306,6 +306,7 @@ class RepositoryModuleManager extends BaseModuleManager { keyword, assertionId, stateIndex, + dataSource, lastCommitEpoch, lastProofEpoch, ) { @@ -324,6 +325,7 @@ class RepositoryModuleManager extends BaseModuleManager { keyword, assertionId, stateIndex, + dataSource, lastCommitEpoch, lastProofEpoch, ); @@ -388,12 +390,16 @@ class RepositoryModuleManager extends BaseModuleManager { timestampSeconds, blockchain, commitWindowDurationPerc, + ask, + startTimeDelay, ) { if (this.initialized) { return this.getRepository('service_agreement').getEligibleAgreementsForSubmitCommit( timestampSeconds, blockchain, commitWindowDurationPerc, + ask, + startTimeDelay, ); } } diff --git a/src/service/blockchain-event-listener-service.js b/src/service/blockchain-event-listener-service.js index d49436ae87..6818420e95 100644 --- a/src/service/blockchain-event-listener-service.js +++ b/src/service/blockchain-event-listener-service.js @@ -9,6 +9,9 @@ import { CONTRACT_EVENTS, MAXIMUM_FETCH_EVENTS_FAILED_COUNT, DELAY_BETWEEN_FAILED_FETCH_EVENTS_MILLIS, + CONTRACT_EVENT_TO_GROUP_MAPPING, + GROUPED_CONTRACT_EVENTS, + SERVICE_AGREEMENT_SOURCES, } from '../constants/constants.js'; const fetchEventsFailedCount = {}; @@ -24,6 +27,10 @@ class BlockchainEventListenerService { this.pendingStorageService = ctx.pendingStorageService; this.ualService = ctx.ualService; this.hashingService = ctx.hashingService; + this.serviceAgreementService = ctx.serviceAgreementService; + this.shardingTableService = ctx.shardingTableService; + + this.eventGroupsBuffer = {}; } async initialize() { @@ -32,6 +39,7 @@ class BlockchainEventListenerService { this.logger.info( `Initializing blockchain event listener for blockchain ${blockchainId}, handling missed events`, ); + this.eventGroupsBuffer[blockchainId] = {}; promises.push(this.fetchAndHandleBlockchainEvents(blockchainId)); } await Promise.all(promises); @@ -50,7 +58,27 @@ class BlockchainEventListenerService { process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST; const currentBlock = await this.blockchainModuleManager.getBlockNumber(blockchainId); + + if (devEnvironment) { + // handling sharding table node added events first for tests and local network setup + // because of race condition for node added and ask updated events + const shardingTableEvents = await this.getContractEvents( + blockchainId, + CONTRACTS.SHARDING_TABLE_CONTRACT, + currentBlock, + CONTRACT_EVENTS.SHARDING_TABLE, + ); + + await this.handleBlockchainEvents(shardingTableEvents, blockchainId); + } + const syncContractEventsPromises = [ + this.getContractEvents( + blockchainId, + CONTRACTS.CONTENT_ASSET, + currentBlock, + CONTRACT_EVENTS.CONTENT_ASSET, + ), this.getContractEvents( blockchainId, CONTRACTS.SHARDING_TABLE_CONTRACT, @@ -69,7 +97,6 @@ class BlockchainEventListenerService { currentBlock, CONTRACT_EVENTS.PROFILE, ), - // TODO: Update with new commit managers this.getContractEvents( blockchainId, CONTRACTS.COMMIT_MANAGER_V1_U1_CONTRACT, @@ -90,9 +117,9 @@ class BlockchainEventListenerService { ), this.getContractEvents( blockchainId, - CONTRACTS.Log2PLDSF_CONTRACT, + CONTRACTS.LOG2PLDSF_CONTRACT, currentBlock, - CONTRACT_EVENTS.Log2PLDSF, + CONTRACT_EVENTS.LOG2PLDSF, ), this.getContractEvents( blockchainId, @@ -168,7 +195,7 @@ class BlockchainEventListenerService { contractName, ); - const events = await this.blockchainModuleManager.getAllPastEvents( + const result = await this.blockchainModuleManager.getAllPastEvents( blockchainId, contractName, eventsToFilter, @@ -179,12 +206,16 @@ class BlockchainEventListenerService { await this.repositoryModuleManager.updateLastCheckedBlock( blockchainId, - currentBlock, + result.lastCheckedBlock, Date.now(0), contractName, ); - return events; + if (!result.eventsMissed) { + await this.shardingTableService.pullBlockchainShardingTable(blockchainId, true); + } + + return result.events; } async handleBlockchainEvents(events, blockchainId) { @@ -206,29 +237,78 @@ class BlockchainEventListenerService { this.logger.trace( `Processing ${unprocessedEvents.length} blockchain events on blockchain ${blockchainId}.`, ); - let groupedEvents = {}; - let currentBlock = 0; + let batchedEvents = {}; + let currentBlockNumber = 0; for (const event of unprocessedEvents) { - if (event.block !== currentBlock) { + if (event.block !== currentBlockNumber) { // eslint-disable-next-line no-await-in-loop - await this.handleBlockGroupedEvents(groupedEvents); - groupedEvents = {}; - currentBlock = event.block; + await this.handleBlockBatchedEvents(batchedEvents); + batchedEvents = {}; + currentBlockNumber = event.block; } - if (groupedEvents[event.event]) { - groupedEvents[event.event].push(event); + + // Check if event should be grouped with other event + const eventsGroupName = CONTRACT_EVENT_TO_GROUP_MAPPING[event.event]; + if (eventsGroupName) { + // Get Events Group object containing predefined events and Grouping Key (Event Argument) + const eventsGroup = GROUPED_CONTRACT_EVENTS[eventsGroupName]; + // Get value of the Grouping Key from the Event + const groupingKeyValue = JSON.parse(event.data)[eventsGroup.groupingKey]; + + if (!this.eventGroupsBuffer[blockchainId][eventsGroupName]) { + this.eventGroupsBuffer[blockchainId][eventsGroupName] = {}; + } + + if (!this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue]) { + this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue] = + []; + } + + // Push event to the buffer until Events Group is not full + this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue].push( + event, + ); + + // Mark event as processed + // TODO: There should be a smarter way to do this, because it will cause troubles + // in case node goes offline while only catched some of the events from the group + // and not all of them. Buffer will be cleared and event is already marked as processed. + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.markBlockchainEventsAsProcessed([event]); + + // When all expected Events from the Event Group are collected + if ( + this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue] + .length === eventsGroup.events.length + ) { + if (!batchedEvents[eventsGroupName]) { + batchedEvents[eventsGroupName] = []; + } + + // Add Events Group to the Processing Queue + batchedEvents[eventsGroupName].push( + this.eventGroupsBuffer[blockchainId][eventsGroupName][groupingKeyValue], + ); + + // Remove Events Group from the Buffer + delete this.eventGroupsBuffer[blockchainId][eventsGroupName][ + groupingKeyValue + ]; + } + } else if (batchedEvents[event.event]) { + batchedEvents[event.event].push(event); } else { - groupedEvents[event.event] = [event]; + batchedEvents[event.event] = [event]; } } - await this.handleBlockGroupedEvents(groupedEvents); + await this.handleBlockBatchedEvents(batchedEvents); } } - async handleBlockGroupedEvents(groupedEvents) { + async handleBlockBatchedEvents(batchedEvents) { const handleBlockEventsPromises = []; - for (const [eventName, blockEvents] of Object.entries(groupedEvents)) { + for (const [eventName, blockEvents] of Object.entries(batchedEvents)) { handleBlockEventsPromises.push(this.handleBlockEvents(eventName, blockEvents)); } // eslint-disable-next-line no-await-in-loop @@ -254,12 +334,12 @@ class BlockchainEventListenerService { const { blockchainId, contract, data } = event; const { parameterName, parameterValue } = JSON.parse(data); switch (contract) { - case CONTRACTS.Log2PLDSF_CONTRACT: + case CONTRACTS.LOG2PLDSF_CONTRACT: // This invalidates contracts parameter // TODO: Create function for contract call cache invalidation this.blockchainModuleManager.setContractCallCache( blockchainId, - CONTRACTS.Log2PLDSF_CONTRACT, + CONTRACTS.LOG2PLDSF_CONTRACT, parameterName, null, ); @@ -310,7 +390,10 @@ class BlockchainEventListenerService { ); if (contractName === CONTRACTS.SHARDING_TABLE_CONTRACT) { - await this.repositoryModuleManager.cleanShardingTable(event.blockchainId); + await this.shardingTableService.pullBlockchainShardingTable( + event.blockchainId, + true, + ); } }), ); @@ -437,16 +520,87 @@ class BlockchainEventListenerService { ); } - async handleServiceAgreementV1ExtendedEvents(blockEvents) { + async handleAssetCreatedGroupEvents(blockGroupEvents) { await Promise.all( - blockEvents.map(async (event) => { - const { agreementId } = JSON.parse(event.data); + blockGroupEvents.map(async (eventsGroup) => { + // Parse and combine Arguments of both AssetMinted and ServiceAgreementCreated Events + const combinedData = eventsGroup.reduce((accumulator, event) => { + try { + const eventData = JSON.parse(event.data); + return { + ...accumulator, + ...eventData, + blockchainId: event.blockchainId, + }; + } catch (error) { + this.logger.error(`Error parsing event data: ${error}`); + return accumulator; + } + }, {}); - const { epochsNumber } = await this.blockchainModuleManager.getAgreementData( - event.blockchainId, - agreementId, + const { + blockchainId, + assetContract: contract, + tokenId, + keyword, + hashFunctionId, + state: assertionId, + startTime, + epochsNumber, + epochLength, + // TODO: Uncomment when these arguments are added to the ServiceAgreementV1Created event + // scoreFunctionId, + // proofWindowOffsetPerc, + } = combinedData; + + const agreementId = this.serviceAgreementService.generateId( + blockchainId, + contract, + tokenId, + keyword, + hashFunctionId, ); + const agreementRecord = + await this.repositoryModuleManager.getServiceAgreementRecord(agreementId); + if (agreementRecord) { + this.logger.trace( + `Skipping processing of asset created event, agreement data present in database for agreement id: ${agreementId} on blockchain ${blockchainId}`, + ); + } else { + // TODO: Remove when added to the event + const { scoreFunctionId, proofWindowOffsetPerc } = + await this.blockchainModuleManager.getAgreementData( + blockchainId, + agreementId, + ); + + await this.repositoryModuleManager.updateServiceAgreementRecord( + blockchainId, + contract, + tokenId, + agreementId, + startTime, + epochsNumber, + epochLength, + scoreFunctionId, + proofWindowOffsetPerc, + hashFunctionId, + keyword, + assertionId, + 0, + SERVICE_AGREEMENT_SOURCES.EVENT, + ); + } + }), + ); + } + + async handleServiceAgreementV1ExtendedEvents(blockEvents) { + await Promise.all( + blockEvents.map(async (event) => { + const { agreementId, epochsNumber } = JSON.parse(event.data); + return this.repositoryModuleManager.updateServiceAgreementEpochsNumber( agreementId, epochsNumber, @@ -466,7 +620,7 @@ class BlockchainEventListenerService { for (const event of blockEvents) { const eventData = JSON.parse(event.data); - const { tokenId, keyword, state, stateIndex } = eventData; + const { tokenId, keyword, hashFunctionId, state, stateIndex } = eventData; const blockchain = event.blockchainId; const contract = eventData.assetContract; this.logger.trace( @@ -479,7 +633,7 @@ class BlockchainEventListenerService { // eslint-disable-next-line no-await-in-loop await Promise.all([ - this._handleStateFinalizedEvent( + this.pendingStorageService.moveAndDeletePendingState( TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY, PENDING_STORAGE_REPOSITORIES.PUBLIC, @@ -487,10 +641,11 @@ class BlockchainEventListenerService { contract, tokenId, keyword, + hashFunctionId, state, stateIndex, ), - this._handleStateFinalizedEvent( + this.pendingStorageService.moveAndDeletePendingState( TRIPLE_STORE_REPOSITORIES.PRIVATE_CURRENT, TRIPLE_STORE_REPOSITORIES.PRIVATE_HISTORY, PENDING_STORAGE_REPOSITORIES.PRIVATE, @@ -498,148 +653,13 @@ class BlockchainEventListenerService { contract, tokenId, keyword, + hashFunctionId, state, stateIndex, ), ]); } } - - async _handleStateFinalizedEvent( - currentRepository, - historyRepository, - pendingRepository, - blockchain, - contract, - tokenId, - keyword, - assertionId, - stateIndex, - ) { - const assertionLinks = await this.tripleStoreService.getAssetAssertionLinks( - currentRepository, - blockchain, - contract, - tokenId, - ); - const storedAssertionIds = assertionLinks.map(({ assertion }) => - assertion.replace('assertion:', ''), - ); - - // event already handled - if (storedAssertionIds.includes(assertionId)) { - return; - } - - // move old assertions to history repository - await Promise.all( - storedAssertionIds.map((storedAssertionId) => - this.tripleStoreService.moveAsset( - currentRepository, - historyRepository, - storedAssertionId, - blockchain, - contract, - tokenId, - keyword, - ), - ), - ); - - await this.tripleStoreService.deleteAssetMetadata( - currentRepository, - blockchain, - contract, - tokenId, - ); - - const cachedData = await this.pendingStorageService.getCachedAssertion( - pendingRepository, - blockchain, - contract, - tokenId, - assertionId, - ); - - const storePromises = []; - if (cachedData?.public?.assertion) { - // insert public assertion in current repository - storePromises.push( - this.tripleStoreService.localStoreAsset( - currentRepository, - assertionId, - cachedData.public.assertion, - blockchain, - contract, - tokenId, - keyword, - ), - ); - - if ( - currentRepository === TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT && - cachedData.agreementId && - cachedData.agreementData - ) { - const serviceAgreement = - await this.repositoryModuleManager.getServiceAgreementRecord( - cachedData.agreementId, - ); - - await this.repositoryModuleManager.updateServiceAgreementRecord( - blockchain, - contract, - tokenId, - cachedData.agreementId, - cachedData.agreementData.startTime, - cachedData.agreementData.epochsNumber, - cachedData.agreementData.epochLength, - cachedData.agreementData.scoreFunctionId, - cachedData.agreementData.proofWindowOffsetPerc, - CONTENT_ASSET_HASH_FUNCTION_ID, - keyword, - assertionId, - stateIndex, - serviceAgreement?.lastCommitEpoch, - serviceAgreement?.lastProofEpoch, - ); - } - } else if (currentRepository === TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT) { - await this.repositoryModuleManager.removeServiceAgreementRecord( - blockchain, - contract, - tokenId, - ); - } - - if (cachedData?.private?.assertion && cachedData?.private?.assertionId) { - // insert private assertion in current repository - storePromises.push( - this.tripleStoreService.localStoreAsset( - currentRepository, - cachedData.private.assertionId, - cachedData.private.assertion, - blockchain, - contract, - tokenId, - keyword, - ), - ); - } - - await Promise.all(storePromises); - - // remove asset from pending storage - if (cachedData) { - await this.pendingStorageService.removeCachedAssertion( - pendingRepository, - blockchain, - contract, - tokenId, - assertionId, - ); - } - } } export default BlockchainEventListenerService; diff --git a/src/service/file-service.js b/src/service/file-service.js index bc5b60098a..34c1c68c45 100644 --- a/src/service/file-service.js +++ b/src/service/file-service.js @@ -141,6 +141,40 @@ class FileService { ); } + async getPendingStorageLatestDocument(repository, blockchain, contract, tokenId) { + const pendingStorageFolder = this.getPendingStorageFolderPath( + repository, + blockchain, + contract, + tokenId, + ); + + let latestFile; + let latestMtime = 0; + try { + const files = await readdir(pendingStorageFolder); + + for (const file of files) { + const filePath = path.join(pendingStorageFolder, file); + // eslint-disable-next-line no-await-in-loop + const stats = await stat(filePath); + + if (stats.mtimeMs > latestMtime) { + latestFile = file; + latestMtime = stats.mtimeMs; + } + } + } catch (error) { + if (error.code === 'ENOENT') { + this.logger.debug(`Folder not found at path: ${pendingStorageFolder}`); + return false; + } + throw error; + } + + return latestFile ?? false; + } + async getPendingStorageDocumentPath(repository, blockchain, contract, tokenId, assertionId) { const pendingStorageFolder = this.getPendingStorageFolderPath( repository, diff --git a/src/service/get-service.js b/src/service/get-service.js index 2fe8f37473..d9106d2d03 100644 --- a/src/service/get-service.js +++ b/src/service/get-service.js @@ -6,6 +6,7 @@ import { ERROR_TYPE, OPERATIONS, OPERATION_REQUEST_STATUS, + TRIPLE_STORE_REPOSITORIES, } from '../constants/constants.js'; class GetService extends OperationService { @@ -20,6 +21,9 @@ class GetService extends OperationService { OPERATION_ID_STATUS.GET.GET_END, OPERATION_ID_STATUS.COMPLETED, ]; + this.ualService = ctx.ualService; + this.tripleStoreService = ctx.tripleStoreService; + this.blockchainModuleManager = ctx.blockchainModuleManager; this.operationMutex = new Mutex(); } @@ -32,6 +36,11 @@ class GetService extends OperationService { keyword, batchSize, minAckResponses, + contract, + tokenId, + assertionId, + assetSync, + stateIndex, } = command.data; const keywordsStatuses = await this.getResponsesStatuses( @@ -70,6 +79,24 @@ class GetService extends OperationService { this.completedStatuses, ); this.logResponsesSummary(completedNumber, failedNumber); + + if (assetSync) { + const ual = this.ualService.deriveUAL(blockchain, contract, tokenId); + + this.logger.debug( + `ASSET_SYNC: ${responseData.nquads.length} nquads found for asset with ual: ${ual}, state index: ${stateIndex}, assertionId: ${assertionId}`, + ); + + await this.tripleStoreService.localStoreAsset( + TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, + assertionId, + responseData.nquads, + blockchain, + contract, + tokenId, + keyword, + ); + } } if ( @@ -89,6 +116,12 @@ class GetService extends OperationService { this.completedStatuses, ); this.logResponsesSummary(completedNumber, failedNumber); + if (assetSync) { + const ual = this.ualService.deriveUAL(blockchain, contract, tokenId); + this.logger.debug( + `ASSET_SYNC: No nquads found for asset with ual: ${ual}, state index: ${stateIndex}, assertionId: ${assertionId}`, + ); + } } else { await this.scheduleOperationForLeftoverNodes(command.data, leftoverNodes); } diff --git a/src/service/hashing-service.js b/src/service/hashing-service.js index 0a0da3fab5..a9404ed8f9 100644 --- a/src/service/hashing-service.js +++ b/src/service/hashing-service.js @@ -10,7 +10,7 @@ class HashingService { }; } - async callHashFunction(hashFunctionId, data) { + callHashFunction(hashFunctionId, data) { const hashFunctionName = this.getHashFunctionName(hashFunctionId); return this[hashFunctionName](data); } @@ -19,7 +19,7 @@ class HashingService { return this.hashFunctions[hashFunctionId]; } - async sha256(data) { + sha256(data) { if (!ethers.utils.isBytesLike(data)) { const bytesLikeData = ethers.utils.toUtf8Bytes(data); return ethers.utils.sha256(bytesLikeData); diff --git a/src/service/pending-storage-service.js b/src/service/pending-storage-service.js index 0817453d27..1269e760f0 100644 --- a/src/service/pending-storage-service.js +++ b/src/service/pending-storage-service.js @@ -1,8 +1,17 @@ +import { + CONTENT_ASSET_HASH_FUNCTION_ID, + SERVICE_AGREEMENT_SOURCES, +} from '../constants/constants.js'; + class PendingStorageService { constructor(ctx) { this.logger = ctx.logger; this.fileService = ctx.fileService; this.ualService = ctx.ualService; + this.serviceAgreementService = ctx.serviceAgreementService; + this.repositoryModuleManager = ctx.repositoryModuleManager; + this.blockchainModuleManager = ctx.blockchainModuleManager; + this.tripleStoreService = ctx.tripleStoreService; } async cacheAssertion( @@ -116,6 +125,154 @@ class PendingStorageService { return false; } } + + async getPendingState(repository, blockchain, contract, tokenId) { + return this.fileService.getPendingStorageLatestDocument( + repository, + blockchain, + contract, + tokenId, + ); + } + + async moveAndDeletePendingState( + currentRepository, + historyRepository, + pendingRepository, + blockchain, + contract, + tokenId, + keyword, + hashFunctionId, + assertionId, + stateIndex, + ) { + const agreementId = this.serviceAgreementService.generateId( + blockchain, + contract, + tokenId, + keyword, + hashFunctionId, + ); + + let serviceAgreementData = await this.repositoryModuleManager.getServiceAgreementRecord( + agreementId, + ); + if (!serviceAgreementData) { + serviceAgreementData = await this.blockchainModuleManager.getAgreementData( + blockchain, + agreementId, + ); + } + + await this.repositoryModuleManager.updateServiceAgreementRecord( + blockchain, + contract, + tokenId, + agreementId, + serviceAgreementData.startTime, + serviceAgreementData.epochsNumber, + serviceAgreementData.epochLength, + serviceAgreementData.scoreFunctionId, + serviceAgreementData.proofWindowOffsetPerc, + CONTENT_ASSET_HASH_FUNCTION_ID, + keyword, + assertionId, + stateIndex, + serviceAgreementData.dataSource ?? SERVICE_AGREEMENT_SOURCES.BLOCKCHAIN, + serviceAgreementData?.lastCommitEpoch, + serviceAgreementData?.lastProofEpoch, + ); + + const assertionLinks = await this.tripleStoreService.getAssetAssertionLinks( + currentRepository, + blockchain, + contract, + tokenId, + ); + const storedAssertionIds = assertionLinks.map(({ assertion }) => + assertion.replace('assertion:', ''), + ); + + // event already handled + if (storedAssertionIds.includes(assertionId)) { + return; + } + + // move old assertions to history repository + await Promise.all( + storedAssertionIds.map((storedAssertionId) => + this.tripleStoreService.moveAsset( + currentRepository, + historyRepository, + storedAssertionId, + blockchain, + contract, + tokenId, + keyword, + ), + ), + ); + + await this.tripleStoreService.deleteAssetMetadata( + currentRepository, + blockchain, + contract, + tokenId, + ); + + const cachedData = await this.getCachedAssertion( + pendingRepository, + blockchain, + contract, + tokenId, + assertionId, + ); + + const storePromises = []; + if (cachedData?.public?.assertion) { + // insert public assertion in current repository + storePromises.push( + this.tripleStoreService.localStoreAsset( + currentRepository, + assertionId, + cachedData.public.assertion, + blockchain, + contract, + tokenId, + keyword, + ), + ); + } + + if (cachedData?.private?.assertion && cachedData?.private?.assertionId) { + // insert private assertion in current repository + storePromises.push( + this.tripleStoreService.localStoreAsset( + currentRepository, + cachedData.private.assertionId, + cachedData.private.assertion, + blockchain, + contract, + tokenId, + keyword, + ), + ); + } + + await Promise.all(storePromises); + + // remove asset from pending storage + if (cachedData) { + await this.removeCachedAssertion( + pendingRepository, + blockchain, + contract, + tokenId, + assertionId, + ); + } + } } export default PendingStorageService; diff --git a/src/service/proximity-scoring-service.js b/src/service/proximity-scoring-service.js index 74327f5e9b..cf62d708f4 100644 --- a/src/service/proximity-scoring-service.js +++ b/src/service/proximity-scoring-service.js @@ -95,7 +95,7 @@ class ProximityScoringService { } = log2PLDSFParams; const mappedStake = this.blockchainModuleManager - .toBigNumber(blockchain, this.blockchainModuleManager.convertToWei(blockchain, stake)) + .convertToWei(blockchain, stake) .div(stakeMappingCoefficient); const mappedDistance = distance.div(distanceMappingCoefficient); @@ -121,18 +121,9 @@ class ProximityScoringService { ) { const linearSumParams = await this.blockchainModuleManager.getLinearSumParams(blockchain); const { distanceScaleFactor, stakeScaleFactor, w1, w2 } = linearSumParams; - const mappedStake = this.blockchainModuleManager.toBigNumber( - blockchain, - this.blockchainModuleManager.convertToWei(blockchain, stake), - ); - const mappedMinStake = this.blockchainModuleManager.toBigNumber( - blockchain, - this.blockchainModuleManager.convertToWei(blockchain, minStake), - ); - const mappedMaxStake = this.blockchainModuleManager.toBigNumber( - blockchain, - this.blockchainModuleManager.convertToWei(blockchain, maxStake), - ); + const mappedStake = this.blockchainModuleManager.convertToWei(blockchain, stake); + const mappedMinStake = this.blockchainModuleManager.convertToWei(blockchain, minStake); + const mappedMaxStake = this.blockchainModuleManager.convertToWei(blockchain, maxStake); const idealMaxDistanceInNeighborhood = HASH_RING_SIZE.div(nodesNumber).mul( Math.ceil(r2 / 2), diff --git a/src/service/service-agreement-service.js b/src/service/service-agreement-service.js index dd59d5d38a..903ca7805b 100644 --- a/src/service/service-agreement-service.js +++ b/src/service/service-agreement-service.js @@ -10,7 +10,7 @@ class ServiceAgreementService { this.proximityScoringService = ctx.proximityScoringService; } - async generateId(blockchain, assetTypeContract, tokenId, keyword, hashFunctionId) { + generateId(blockchain, assetTypeContract, tokenId, keyword, hashFunctionId) { return this.hashingService.callHashFunction( hashFunctionId, this.blockchainModuleManager.encodePacked( @@ -25,6 +25,28 @@ class ServiceAgreementService { return Math.floor(Math.random() * (max - min + 1) + min); } + async calculateBid(blockchain, blockchainAssertionSize, agreementData, r0) { + const currentEpoch = await this.calculateCurrentEpoch( + agreementData.startTime, + agreementData.epochLength, + blockchain, + ); + + // todo: consider optimizing to take into account cases where some proofs have already been submitted + const epochsLeft = Number(agreementData.epochsNumber) - currentEpoch; + + const divisor = this.blockchainModuleManager + .toBigNumber(blockchain, r0) + .mul(epochsLeft) + .mul(blockchainAssertionSize); + + return agreementData.tokenAmount + .add(agreementData.updateTokenAmount) + .mul(1024) + .div(divisor) + .add(1); // add 1 wei because of the precision loss + } + async calculateRank( blockchain, keyword, @@ -109,6 +131,11 @@ class ServiceAgreementService { maxStake, ); } + + async calculateCurrentEpoch(startTime, epochLength, blockchain) { + const now = await this.blockchainModuleManager.getBlockchainTimestamp(blockchain); + return Math.floor((Number(now) - Number(startTime)) / Number(epochLength)); + } } export default ServiceAgreementService; diff --git a/src/service/sharding-table-service.js b/src/service/sharding-table-service.js index 72bef67128..9926672b01 100644 --- a/src/service/sharding-table-service.js +++ b/src/service/sharding-table-service.js @@ -35,16 +35,17 @@ class ShardingTableService { }); } - async pullBlockchainShardingTable(blockchainId) { + async pullBlockchainShardingTable(blockchainId, force = false) { const lastCheckedBlock = await this.repositoryModuleManager.getLastCheckedBlock( blockchainId, CONTRACTS.SHARDING_TABLE_CONTRACT, ); if ( - lastCheckedBlock?.lastCheckedTimestamp && - Date.now() - lastCheckedBlock.lastCheckedTimestamp < - DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS + force || + (lastCheckedBlock?.lastCheckedTimestamp && + Date.now() - lastCheckedBlock.lastCheckedTimestamp < + DEFAULT_BLOCKCHAIN_EVENT_SYNC_PERIOD_IN_MILLS) ) { return; } @@ -187,6 +188,7 @@ class ShardingTableService { const myPeerId = this.networkModuleManager.getPeerId().toB58String(); const filteredPeerRecords = peerRecords.filter((peer) => peer.peerId !== myPeerId); const sorted = filteredPeerRecords.sort((a, b) => a.ask - b.ask); + let ask; if (sorted.length > r1) { ask = sorted[r1 - 1].ask; @@ -197,7 +199,7 @@ class ShardingTableService { const r0 = await this.blockchainModuleManager.getR0(blockchainId); const bidSuggestion = this.blockchainModuleManager - .toBigNumber(blockchainId, this.blockchainModuleManager.convertToWei(blockchainId, ask)) + .convertToWei(blockchainId, ask) .mul(kbSize) .mul(epochsNumber) .mul(r0) diff --git a/test/unit/mock/blockchain-module-manager-mock.js b/test/unit/mock/blockchain-module-manager-mock.js index 8236f00ec5..15cede62b1 100644 --- a/test/unit/mock/blockchain-module-manager-mock.js +++ b/test/unit/mock/blockchain-module-manager-mock.js @@ -22,7 +22,7 @@ class BlockchainModuleManagerMock { } convertToWei(blockchainId, value) { - return ethers.utils.parseUnits(value.toString(), 'ether').toString(); + return ethers.utils.parseUnits(value.toString(), 'ether'); } toBigNumber(blockchain, value) { diff --git a/tools/knowledge-assets-distribution-simulation/mocks/blockchain-module-manager-mock.js b/tools/knowledge-assets-distribution-simulation/mocks/blockchain-module-manager-mock.js index 5651dddee7..b16aa6f387 100644 --- a/tools/knowledge-assets-distribution-simulation/mocks/blockchain-module-manager-mock.js +++ b/tools/knowledge-assets-distribution-simulation/mocks/blockchain-module-manager-mock.js @@ -14,7 +14,7 @@ class BlockchainModuleManagerMock { } convertToWei(blockchain, value, fromUnit = 'ether') { - return ethers.utils.parseUnits(value.toString(), fromUnit).toString(); + return ethers.utils.parseUnits(value.toString(), fromUnit); } toBigNumber(blockchain, value) {