Skip to content

Commit

Permalink
Merge pull request #3143 from OriginTrail/v6/development-network
Browse files Browse the repository at this point in the history
OriginTrail Testnet Prerelease v6.3.0.Hotfix 1
  • Loading branch information
djordjekovac authored Apr 19, 2024
2 parents 7419584 + 71cf15a commit 3bcb8a0
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 67 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "6.3.0",
"version": "6.3.0+hotfix.1",
"description": "OTNode V6",
"main": "index.js",
"type": "module",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
/* eslint-disable no-await-in-loop */
import { setTimeout as sleep } from 'timers/promises';
import Command from '../../command.js';
import {
CONTENT_ASSET_HASH_FUNCTION_ID,
EXPECTED_TRANSACTION_ERRORS,
GET_ASSERTION_IDS_MAX_RETRY_COUNT,
GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS,
GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE,
GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID,
GET_LATEST_SERVICE_AGREEMENT_FREQUENCY_MILLS,
SERVICE_AGREEMENT_SOURCES,
} from '../../../constants/constants.js';

const BATCH_SIZE = 50;

class BlockchainGetLatestServiceAgreement extends Command {
constructor(ctx) {
super(ctx);
Expand All @@ -27,37 +31,69 @@ class BlockchainGetLatestServiceAgreement extends Command {
const assetStorageContractAddresses =
this.blockchainModuleManager.getAssetStorageContractAddresses(blockchain);

await Promise.all(
const results = await Promise.all(
assetStorageContractAddresses.map((contract) =>
this.updateAgreementDataForAssetContract(contract, blockchain),
this.updateAgreementDataForAssetContract(
contract,
blockchain,
command.data[contract],
),
),
);

results.forEach((result) => {
if (result) {
// eslint-disable-next-line no-param-reassign
command.data[result.contract] = result.lastProcessedTokenId;
}
});

return Command.repeat();
}

async updateAgreementDataForAssetContract(contract, blockchain) {
async updateAgreementDataForAssetContract(contract, blockchain, lastProcessedTokenId) {
this.logger.info(
`Get latest service agreement: Starting get latest service agreement command for blockchain: ${blockchain}`,
`Get latest service agreement: Starting get latest service agreement command, last processed token id: ${lastProcessedTokenId} for blockchain: ${blockchain}`,
);
let latestBlockchainTokenId;
try {
latestBlockchainTokenId = await this.blockchainModuleManager.getLatestTokenId(
blockchain,
contract,
);
latestBlockchainTokenId =
Number(await this.blockchainModuleManager.getLatestTokenId(blockchain, contract)) -
GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID;
} catch (error) {
if (error.message.includes(EXPECTED_TRANSACTION_ERRORS.NO_MINTED_ASSETS)) {
this.logger.info(
`Get latest service agreement: No minted assets on blockchain: ${blockchain}`,
);
return;
}
throw error;
this.logger.error(
`Unable to process agreement data for asset contract ${contract}. Error: ${error}`,
);
return;
}

const latestDbTokenId =
(await this.repositoryModuleManager.getLatestServiceAgreementTokenId(blockchain)) ?? 0;
lastProcessedTokenId ??
(await this.repositoryModuleManager.getLatestServiceAgreementTokenId(blockchain)) ??
0;

if (latestBlockchainTokenId < latestDbTokenId) {
this.logger.debug(
`Get latest service agreement: No new agreements found on blockchain: ${blockchain}.`,
);
return {
contract,
lastProcessedTokenId: latestDbTokenId,
};
}

if (latestBlockchainTokenId < latestDbTokenId) {
this.logger.debug(
`Get latest service agreement: No new agreements found on blockchain: ${blockchain}.`,
);
return;
}

this.logger.debug(
`Get latest service agreement: Latest token id on chain: ${latestBlockchainTokenId}, latest token id in database: ${latestDbTokenId} on blockchain: ${blockchain}`,
Expand All @@ -75,26 +111,28 @@ class BlockchainGetLatestServiceAgreement extends Command {
);
if (
getAgreementDataPromise.length === tokenIdDifference ||
getAgreementDataPromise.length === BATCH_SIZE
getAgreementDataPromise.length === GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE
) {
// eslint-disable-next-line no-await-in-loop
const missingAgreements = await Promise.all(getAgreementDataPromise);

// eslint-disable-next-line no-await-in-loop
await this.repositoryModuleManager.bulkCreateServiceAgreementRecords(
missingAgreements.filter((agreement) => agreement != null),
);
getAgreementDataPromise = [];
tokenIdDifference -= BATCH_SIZE;
tokenIdDifference -= GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE;
}
}
if (latestBlockchainTokenId - latestDbTokenId !== 0) {
if (latestBlockchainTokenId - latestDbTokenId > 0) {
this.logger.debug(
`Get latest service agreement: Successfully fetched ${
latestBlockchainTokenId - latestDbTokenId
} on blockchain: ${blockchain}`,
);
}
return {
contract,
lastProcessedTokenId: latestDbTokenId,
};
}

async getAgreementDataForToken(
Expand All @@ -103,56 +141,76 @@ class BlockchainGetLatestServiceAgreement extends Command {
contract,
hashFunctionId = CONTENT_ASSET_HASH_FUNCTION_ID,
) {
this.logger.debug(
`Get latest service agreement: Getting agreement data for token id: ${tokenId} on blockchain: ${blockchain}`,
);
const assertionIds = await this.blockchainModuleManager.getAssertionIds(
blockchain,
contract,
tokenId,
);
const keyword = await this.ualService.calculateLocationKeyword(
blockchain,
contract,
tokenId,
assertionIds[0],
);
const agreementId = await this.serviceAgreementService.generateId(
blockchain,
contract,
tokenId,
keyword,
hashFunctionId,
);
const agreementData = await this.blockchainModuleManager.getAgreementData(
blockchain,
agreementId,
);
try {
this.logger.debug(
`Get latest service agreement: Getting agreement data for token id: ${tokenId} on blockchain: ${blockchain}`,
);
let assertionIds = [];
let retryCount = 0;

while (assertionIds.length === 0) {
if (retryCount === GET_ASSERTION_IDS_MAX_RETRY_COUNT) {
throw Error(
`Get latest service agreement: Unable to get assertion ids for token id: ${tokenId} on blockchain: ${blockchain}`,
);
}
this.logger.debug(
`Get latest service agreement: getting assertion ids retry ${retryCount} for token id: ${tokenId} on blockchain: ${blockchain}`,
);
assertionIds = await this.blockchainModuleManager.getAssertionIds(
blockchain,
contract,
tokenId,
);
retryCount += 1;
await sleep(GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS * 1000);
}

if (!agreementData) {
this.logger.warn(
`Unable to fetch agreement data while processing asset created event for agreement id: ${agreementId}, blockchain id: ${blockchain}`,
const keyword = await this.ualService.calculateLocationKeyword(
blockchain,
contract,
tokenId,
assertionIds[0],
);
const agreementId = await this.serviceAgreementService.generateId(
blockchain,
contract,
tokenId,
keyword,
hashFunctionId,
);
const agreementData = await this.blockchainModuleManager.getAgreementData(
blockchain,
agreementId,
);
}

const latestStateIndex = assertionIds.length - 1;
if (!agreementData) {
throw Error(
`Get latest service agreement: Unable to fetch agreement data while processing asset created event for agreement id: ${agreementId}, blockchain id: ${blockchain}`,
);
}

return {
blockchainId: blockchain,
assetStorageContractAddress: contract,
tokenId,
agreementId,
startTime: agreementData.startTime,
epochsNumber: agreementData.epochsNumber,
epochLength: agreementData.epochLength,
scoreFunctionId: agreementData.scoreFunctionId,
stateIndex: latestStateIndex,
assertionId: assertionIds[latestStateIndex],
hashFunctionId,
keyword,
proofWindowOffsetPerc: agreementData.proofWindowOffsetPerc,
dataSource: SERVICE_AGREEMENT_SOURCES.NODE,
};
const latestStateIndex = assertionIds.length - 1;

return {
blockchainId: blockchain,
assetStorageContractAddress: contract,
tokenId,
agreementId,
startTime: agreementData.startTime,
epochsNumber: agreementData.epochsNumber,
epochLength: agreementData.epochLength,
scoreFunctionId: agreementData.scoreFunctionId,
stateIndex: latestStateIndex,
assertionId: assertionIds[latestStateIndex],
hashFunctionId,
keyword,
proofWindowOffsetPerc: agreementData.proofWindowOffsetPerc,
dataSource: SERVICE_AGREEMENT_SOURCES.NODE,
};
} catch (error) {
this.logger.error(error.message);
}
}

/**
Expand Down
8 changes: 8 additions & 0 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,14 @@ export const ARCHIVE_UPDATE_RESPONSES_FOLDER = 'update_responses';
*/
export const COMMAND_QUEUE_PARALLELISM = 100;

export const GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE = 50;

export const GET_ASSERTION_IDS_MAX_RETRY_COUNT = 5;

export const GET_ASSERTION_IDS_RETRY_DELAY_IN_SECONDS = 2;

export const GET_LATEST_SERVICE_AGREEMENT_EXCLUDE_LATEST_TOKEN_ID = 1;

/**
* @constant {object} HTTP_API_ROUTES -
* HTTP API Routes with parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ServiceAgreementRepository {

async bulkCreateServiceAgreementRecords(serviceAgreements) {
return this.model.bulkCreate(serviceAgreements, {
validate: true,
ignoreDuplicates: true,
});
}

Expand Down

0 comments on commit 3bcb8a0

Please sign in to comment.