diff --git a/package-lock.json b/package-lock.json index 673c2e145f..3a18c9d491 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.2.4", + "version": "6.3.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.2.4", + "version": "6.3.0", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index 1eae3e6d32..57fad629e8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.2.4", + "version": "6.3.0", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/commands/common/get-latest-service-agreement/blockchain-get-latest-service-agreement.js b/src/commands/common/get-latest-service-agreement/blockchain-get-latest-service-agreement.js new file mode 100644 index 0000000000..26e79a6f2a --- /dev/null +++ b/src/commands/common/get-latest-service-agreement/blockchain-get-latest-service-agreement.js @@ -0,0 +1,250 @@ +/* eslint-disable no-await-in-loop */ +import { setTimeout as sleep } from 'timers/promises'; +import Command from '../../command.js'; +import { + CONTENT_ASSET_HASH_FUNCTION_ID, + EXPECTED_TRANSACTION_ERRORS, + GET_ASSERTION_IDS_MAX_RETRY_COUNT, + GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS, + GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE, + GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID, + GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS, + SERVICE_AGREEMENT_SOURCES, +} from '../../../constants/constants.js'; + +class BlockchainGetLatestServiceAgreement extends Command { + constructor(ctx) { + super(ctx); + this.repositoryModuleManager = ctx.repositoryModuleManager; + this.blockchainModuleManager = ctx.blockchainModuleManager; + this.serviceAgreementService = ctx.serviceAgreementService; + this.ualService = ctx.ualService; + } + + /** + * Executes command and produces one or more events + * @param command + */ + async execute(command) { + const { blockchain } = command.data; + + const assetStorageContractAddresses = + this.blockchainModuleManager.getAssetStorageContractAddresses(blockchain); + + const results = await Promise.all( + assetStorageContractAddresses.map((contract) => + this.updateAgreementDataForAssetContract( + contract, + blockchain, + command.data[contract], + ), + ), + ); + + results.forEach((result) => { + if (result) { + // eslint-disable-next-line no-param-reassign + command.data[result.contract] = result.lastProcessedTokenId; + this.logger.debug( + `Get latest service agreement: updating last processed token id: ${result.lastProcessedTokenId} for blockchain ${blockchain}`, + ); + } + }); + + return Command.repeat(); + } + + async updateAgreementDataForAssetContract(contract, blockchain, lastProcessedTokenId) { + this.logger.info( + `Get latest service agreement: Starting get latest service agreement command, last processed token id: ${lastProcessedTokenId} for blockchain: ${blockchain}`, + ); + let latestBlockchainTokenId; + try { + latestBlockchainTokenId = + Number(await this.blockchainModuleManager.getLatestTokenId(blockchain, contract)) - + GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID; + } catch (error) { + if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.NO_MINTED_ASSETS)) { + this.logger.info( + `Get latest service agreement: No minted assets on blockchain: ${blockchain}`, + ); + return; + } + this.logger.error( + `Unable to process agreement data for asset contract ${contract}. Error: ${error}`, + ); + return; + } + + const latestDbTokenId = + lastProcessedTokenId ?? + (await this.repositoryModuleManager.getLatestServiceAgreementTokenId(blockchain)) ?? + latestBlockchainTokenId; + + if (latestBlockchainTokenId < latestDbTokenId) { + this.logger.debug( + `Get latest service agreement: No new agreements found on blockchain: ${blockchain}.`, + ); + return { + contract, + lastProcessedTokenId: latestDbTokenId, + }; + } + + if (latestBlockchainTokenId < latestDbTokenId) { + this.logger.debug( + `Get latest service agreement: No new agreements found on blockchain: ${blockchain}.`, + ); + return; + } + + this.logger.debug( + `Get latest service agreement: Latest token id on chain: ${latestBlockchainTokenId}, latest token id in database: ${latestDbTokenId} on blockchain: ${blockchain}`, + ); + + let tokenIdDifference = latestBlockchainTokenId - latestDbTokenId; + let getAgreementDataPromise = []; + for ( + let tokenIdToBeFetched = latestDbTokenId + 1; + tokenIdToBeFetched <= latestBlockchainTokenId; + tokenIdToBeFetched += 1 + ) { + getAgreementDataPromise.push( + this.getAgreementDataForToken(tokenIdToBeFetched, blockchain, contract), + ); + if ( + getAgreementDataPromise.length === tokenIdDifference || + getAgreementDataPromise.length === GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE + ) { + const missingAgreements = await Promise.all(getAgreementDataPromise); + + await this.repositoryModuleManager.bulkCreateServiceAgreementRecords( + missingAgreements.filter((agreement) => agreement != null), + ); + getAgreementDataPromise = []; + tokenIdDifference -= GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE; + } + } + if (latestBlockchainTokenId - latestDbTokenId > 0) { + this.logger.debug( + `Get latest service agreement: Successfully fetched ${ + latestBlockchainTokenId - latestDbTokenId + } on blockchain: ${blockchain}`, + ); + } + return { + contract, + lastProcessedTokenId: latestBlockchainTokenId, + }; + } + + async getAgreementDataForToken( + tokenId, + blockchain, + contract, + hashFunctionId = CONTENT_ASSET_HASH_FUNCTION_ID, + ) { + try { + if (await this.repositoryModuleManager.serviceAgreementExists(blockchain, tokenId)) { + this.logger.debug( + `Get latest service agreement: data exists in repository for token id: ${tokenId} on blockchain: ${blockchain}`, + ); + return; + } + this.logger.debug( + `Get latest service agreement: Getting agreement data for token id: ${tokenId} on blockchain: ${blockchain}`, + ); + let assertionIds = []; + let retryCount = 0; + + while (assertionIds.length === 0) { + if (retryCount === GET_ASSERTION_IDS_MAX_RETRY_COUNT) { + throw Error( + `Get latest service agreement: Unable to get assertion ids for token id: ${tokenId} on blockchain: ${blockchain}`, + ); + } + this.logger.debug( + `Get latest service agreement: getting assertion ids retry ${retryCount} for token id: ${tokenId} on blockchain: ${blockchain}`, + ); + assertionIds = await this.blockchainModuleManager.getAssertionIds( + blockchain, + contract, + tokenId, + ); + retryCount += 1; + await sleep(GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS * 1000); + } + + const keyword = await this.ualService.calculateLocationKeyword( + blockchain, + contract, + tokenId, + assertionIds[0], + ); + const agreementId = await this.serviceAgreementService.generateId( + blockchain, + contract, + tokenId, + keyword, + hashFunctionId, + ); + const agreementData = await this.blockchainModuleManager.getAgreementData( + blockchain, + agreementId, + ); + + if (!agreementData) { + throw Error( + `Get latest service agreement: Unable to fetch agreement data while processing asset created event for agreement id: ${agreementId}, blockchain id: ${blockchain}`, + ); + } + + const latestStateIndex = assertionIds.length - 1; + + return { + blockchainId: blockchain, + assetStorageContractAddress: contract, + tokenId, + agreementId, + startTime: agreementData.startTime, + epochsNumber: agreementData.epochsNumber, + epochLength: agreementData.epochLength, + scoreFunctionId: agreementData.scoreFunctionId, + stateIndex: latestStateIndex, + assertionId: assertionIds[latestStateIndex], + hashFunctionId, + keyword, + proofWindowOffsetPerc: agreementData.proofWindowOffsetPerc, + dataSource: SERVICE_AGREEMENT_SOURCES.NODE, + }; + } catch (error) { + this.logger.error(error.message); + } + } + + /** + * Recover system from failure + * @param error + */ + async recover() { + return Command.repeat(); + } + + /** + * Builds default command + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'blockchainGetLatestServiceAgreement', + data: {}, + period: GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +export default BlockchainGetLatestServiceAgreement; diff --git a/src/commands/common/get-latest-service-agreement/get-latest-service-agreement.js b/src/commands/common/get-latest-service-agreement/get-latest-service-agreement.js new file mode 100644 index 0000000000..c951534637 --- /dev/null +++ b/src/commands/common/get-latest-service-agreement/get-latest-service-agreement.js @@ -0,0 +1,71 @@ +import Command from '../../command.js'; +import { GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS } from '../../../constants/constants.js'; + +class GetLatestServiceAgreement extends Command { + constructor(ctx) { + super(ctx); + this.commandExecutor = ctx.commandExecutor; + this.shardingTableService = ctx.shardingTableService; + this.repositoryModuleManager = ctx.repositoryModuleManager; + this.blockchainModuleManager = ctx.blockchainModuleManager; + } + + /** + * Executes command and produces one or more events + * @param command + */ + async execute() { + const operationId = this.operationIdService.generateId(); + + this.logger.info( + `Get latest service agreement: Starting get latest service agreement command for operation id: ${operationId}`, + ); + + await this.commandExecutor.delete('blockchainGetLatestServiceAgreement'); + + await Promise.all( + this.blockchainModuleManager.getImplementationNames().map(async (blockchain) => { + const commandData = { + blockchain, + operationId, + }; + + return this.commandExecutor.add({ + name: 'blockchainGetLatestServiceAgreement', + data: commandData, + period: GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS, + }); + }), + ); + + return Command.empty(); + } + + /** + * Recover system from failure + * @param command + * @param error + */ + async recover(command) { + this.logger.warn(`Failed to execute ${command.name}. Error: ${command.message}`); + + return Command.repeat(); + } + + /** + * Builds default command + * @param map + * @returns {{add, data: *, delay: *, deadline: *}} + */ + default(map) { + const command = { + name: 'getLatestServiceAgreement', + data: {}, + transactional: false, + }; + Object.assign(command, map); + return command; + } +} + +export default GetLatestServiceAgreement; diff --git a/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js b/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js index 8cc0e6a5e5..baea3a9ec2 100644 --- a/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js +++ b/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js @@ -493,12 +493,9 @@ class BlockchainEpochCheckCommand extends Command { /** * Recover system from failure - * @param command * @param error */ - async recover(command) { - this.logger.warn(`Failed to execute ${command.name}. Error: ${command.message}`); - + async recover() { return Command.repeat(); } diff --git a/src/constants/constants.js b/src/constants/constants.js index 46d710dbdc..be2b6d2da2 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -160,6 +160,8 @@ export const REMOVE_SESSION_COMMAND_DELAY = 2 * 60 * 1000; export const OPERATION_IDS_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000; +export const GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS = 30 * 1000; + export const DIAL_PEERS_COMMAND_FREQUENCY_MILLS = 30 * 1000; export const DIAL_PEERS_CONCURRENCY = 10; @@ -174,6 +176,7 @@ export const PERMANENT_COMMANDS = [ 'commandsCleanerCommand', 'dialPeersCommand', 'epochCheckCommand', + 'getLatestServiceAgreement', 'blockchainEventCleanerCommand', 'getCleanerCommand', 'getResponseCleanerCommand', @@ -464,6 +467,7 @@ export const EXPECTED_TRANSACTION_ERRORS = { PROOF_WINDOW_CLOSED: 'ProofWindowClosed', NODE_NOT_AWARDED: 'NodeNotAwarded', WRONG_MERKLE_PROOF: 'WrongMerkleProof', + NO_MINTED_ASSETS: 'NoMintedAssets', }; /** @@ -544,6 +548,14 @@ export const ARCHIVE_UPDATE_RESPONSES_FOLDER = 'update_responses'; */ export const COMMAND_QUEUE_PARALLELISM = 100; +export const GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE = 50; + +export const GET_ASSERTION_IDS_MAX_RETRY_COUNT = 5; + +export const GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS = 2; + +export const GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID = 1; + /** * @constant {object} HTTP_API_ROUTES - * HTTP API Routes with parameters @@ -661,24 +673,14 @@ export const CONTRACT_EVENTS = { SHARDING_TABLE: ['NodeAdded', 'NodeRemoved'], STAKING: ['StakeIncreased', 'StakeWithdrawalStarted'], PROFILE: ['AskUpdated'], - CONTENT_ASSET: ['AssetMinted'], COMMIT_MANAGER_V1: ['StateFinalized'], - SERVICE_AGREEMENT_V1: [ - 'ServiceAgreementV1Created', - 'ServiceAgreementV1Extended', - 'ServiceAgreementV1Terminated', - ], + SERVICE_AGREEMENT_V1: ['ServiceAgreementV1Extended', 'ServiceAgreementV1Terminated'], PARAMETERS_STORAGE: ['ParameterChanged'], LOG2PLDSF: ['ParameterChanged'], LINEAR_SUM: ['ParameterChanged'], }; -export const GROUPED_CONTRACT_EVENTS = { - AssetCreatedGroup: { - events: ['AssetMinted', 'ServiceAgreementV1Created'], - groupingKey: 'tokenId', - }, -}; +export const GROUPED_CONTRACT_EVENTS = {}; export const CONTRACT_EVENT_TO_GROUP_MAPPING = (() => { const mapping = {}; diff --git a/src/controllers/http-api/v0/bid-suggestion-http-api-controller-v0.js b/src/controllers/http-api/v0/bid-suggestion-http-api-controller-v0.js index e549db4bcf..93fe75c81e 100644 --- a/src/controllers/http-api/v0/bid-suggestion-http-api-controller-v0.js +++ b/src/controllers/http-api/v0/bid-suggestion-http-api-controller-v0.js @@ -51,15 +51,10 @@ class BidSuggestionController extends BaseController { firstAssertionId, hashFunctionId, } = req.query; - let { proximityScoreFunctionsPairId, bidSuggestionRange } = req.query; + let { bidSuggestionRange } = req.query; try { - // TODO: ADD-DOCS - if (!proximityScoreFunctionsPairId) { - if (blockchain.startsWith('otp') || blockchain.startsWith('hardhat1')) - proximityScoreFunctionsPairId = 1; - else if (blockchain.startsWith('gnosis') || blockchain.startsWith('hardhat2')) - proximityScoreFunctionsPairId = 2; - } + const proximityScoreFunctionsPairId = 2; + if (!bidSuggestionRange) { bidSuggestionRange = LOW_BID_SUGGESTION; } diff --git a/src/modules/blockchain/implementation/hardhat/hardhat-service.js b/src/modules/blockchain/implementation/hardhat/hardhat-service.js index 315e52644c..5a89c03a27 100644 --- a/src/modules/blockchain/implementation/hardhat/hardhat-service.js +++ b/src/modules/blockchain/implementation/hardhat/hardhat-service.js @@ -26,9 +26,6 @@ class HardhatService extends Web3Service { } async getAgreementScoreFunctionId() { - if (this.getBlockchainId() === 'hardhat1:31337') { - return 1; - } return 2; } } diff --git a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js index 37ba3fa3c1..7ba9c69b4d 100644 --- a/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js +++ b/src/modules/blockchain/implementation/ot-parachain/ot-parachain-service.js @@ -201,10 +201,6 @@ class OtParachainService extends Web3Service { }); return wallets; } - - async getAgreementScoreFunctionId() { - return 1; - } } export default OtParachainService; diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index 57e8ef9101..062a6876e3 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -83,9 +83,19 @@ class ServiceAgreementRepository { ); } + async serviceAgreementExists(blockchainId, tokenId) { + const agreementRecord = await this.model.findOne({ + where: { + blockchainId, + tokenId, + }, + }); + return !!agreementRecord; + } + async bulkCreateServiceAgreementRecords(serviceAgreements) { return this.model.bulkCreate(serviceAgreements, { - validate: true, + ignoreDuplicates: true, }); } @@ -176,7 +186,10 @@ class ServiceAgreementRepository { [Sequelize.Op.gt]: Sequelize.literal(currentEpoch), }, }, - order: [[Sequelize.col('timeLeftInSubmitCommitWindow'), 'ASC']], + order: [ + ['scoreFunctionId', 'DESC'], + [Sequelize.col('timeLeftInSubmitCommitWindow'), 'ASC'], + ], limit: 100, raw: true, }); @@ -232,7 +245,10 @@ class ServiceAgreementRepository { [Sequelize.Op.gt]: Sequelize.literal(currentEpoch), }, }, - order: [[Sequelize.col('timeLeftInSubmitProofWindow'), 'ASC']], + order: [ + ['scoreFunctionId', 'DESC'], + [Sequelize.col('timeLeftInSubmitProofWindow'), 'ASC'], + ], limit: 100, raw: true, }); @@ -262,6 +278,14 @@ class ServiceAgreementRepository { order: [['token_id', 'asc']], }); } + + async getLatestServiceAgreementTokenId(blockchainId) { + return this.model.max('tokenId', { + where: { + blockchainId, + }, + }); + } } export default ServiceAgreementRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 7e1498e831..9aefcb5535 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -344,6 +344,15 @@ class RepositoryModuleManager extends BaseModuleManager { } } + async serviceAgreementExists(blockchain, tokenId) { + if (this.initialized) { + return this.getRepository('service_agreement').serviceAgreementExists( + blockchain, + tokenId, + ); + } + } + async bulkCreateServiceAgreementRecords(records) { if (this.initialized) { return this.getRepository('service_agreement').bulkCreateServiceAgreementRecords( @@ -449,6 +458,12 @@ class RepositoryModuleManager extends BaseModuleManager { blockchainId, ); } + + async getLatestServiceAgreementTokenId(blockchainId) { + return this.getRepository('service_agreement').getLatestServiceAgreementTokenId( + blockchainId, + ); + } } export default RepositoryModuleManager; diff --git a/src/service/blockchain-event-listener-service.js b/src/service/blockchain-event-listener-service.js index 005d209ceb..62c230bd5d 100644 --- a/src/service/blockchain-event-listener-service.js +++ b/src/service/blockchain-event-listener-service.js @@ -11,7 +11,6 @@ import { DELAY_BETWEEN_FAILED_FETCH_EVENTS_MILLIS, CONTRACT_EVENT_TO_GROUP_MAPPING, GROUPED_CONTRACT_EVENTS, - SERVICE_AGREEMENT_SOURCES, } from '../constants/constants.js'; const fetchEventsFailedCount = {}; @@ -73,12 +72,6 @@ class BlockchainEventListenerService { } const syncContractEventsPromises = [ - this.getContractEvents( - blockchainId, - CONTRACTS.CONTENT_ASSET, - currentBlock, - CONTRACT_EVENTS.CONTENT_ASSET, - ), this.getContractEvents( blockchainId, CONTRACTS.SHARDING_TABLE_CONTRACT, @@ -520,86 +513,6 @@ class BlockchainEventListenerService { ); } - async handleAssetCreatedGroupEvents(blockGroupEvents) { - await Promise.all( - blockGroupEvents.map(async (eventsGroup) => { - // Parse and combine Arguments of both AssetMinted and ServiceAgreementCreated Events - const combinedData = eventsGroup.reduce((accumulator, event) => { - try { - const eventData = JSON.parse(event.data); - return { - ...accumulator, - ...eventData, - blockchainId: event.blockchainId, - }; - } catch (error) { - this.logger.error(`Error parsing event data: ${error}`); - return accumulator; - } - }, {}); - - const { - blockchainId, - assetContract: contract, - tokenId, - keyword, - hashFunctionId, - state: assertionId, - startTime, - epochsNumber, - epochLength, - // TODO: Uncomment when these arguments are added to the ServiceAgreementV1Created event - // scoreFunctionId, - // proofWindowOffsetPerc, - } = combinedData; - - const agreementId = this.serviceAgreementService.generateId( - blockchainId, - contract, - tokenId, - keyword, - hashFunctionId, - ); - - const agreementRecord = - await this.repositoryModuleManager.getServiceAgreementRecord(agreementId); - if (agreementRecord) { - this.logger.trace( - `Skipping processing of asset created event, agreement data present in database for agreement id: ${agreementId} on blockchain ${blockchainId}`, - ); - } else { - const agreementData = await this.blockchainModuleManager.getAgreementData( - blockchainId, - agreementId, - ); - - if (!agreementData) { - this.logger.warn( - `Unable to fetch agreement data while processing asset created event for agreement id: ${agreementId}, blockchain id: ${blockchainId}`, - ); - } - - await this.repositoryModuleManager.updateServiceAgreementRecord( - blockchainId, - contract, - tokenId, - agreementId, - startTime, - epochsNumber, - epochLength, - agreementData?.scoreFunctionId ?? 0, - agreementData?.proofWindowOffsetPerc ?? 0, - hashFunctionId, - keyword, - assertionId, - 0, - SERVICE_AGREEMENT_SOURCES.EVENT, - ); - } - }), - ); - } - async handleServiceAgreementV1ExtendedEvents(blockEvents) { await Promise.all( blockEvents.map(async (event) => { diff --git a/tools/local-network-setup/setup-macos-environment.sh b/tools/local-network-setup/setup-macos-environment.sh index 77ddff4529..b7f0dba41a 100755 --- a/tools/local-network-setup/setup-macos-environment.sh +++ b/tools/local-network-setup/setup-macos-environment.sh @@ -51,7 +51,7 @@ then osascript -e "tell app \"Terminal\" do script \"cd $pathToOtNode - node tools/local-network-setup/run-local-blockchain.js 8545 :v1\" + node tools/local-network-setup/run-local-blockchain.js 8545 :v2\" end tell" echo Waiting for hardhat to start and contracts deployment