From 6dcabac0d960ca4d6855306e8d00e9af401f82c5 Mon Sep 17 00:00:00 2001 From: brkagithub Date: Mon, 16 Dec 2024 17:15:13 +0100 Subject: [PATCH 1/9] paranet sync rework (wip) --- src/commands/paranet/paranet-sync-command.js | 334 +++++++++--------- .../20241215122200-create-paranet-assets.js | 65 ++++ .../sequelize/models/missed-paranet-asset.js | 2 + .../sequelize/models/paranet-asset.js | 84 +++++ .../sequelize/models/paranet-synced-asset.js | 1 + .../repositories/paranet-asset-repository.js | 139 ++++++++ .../paranet-synced-asset-repository.js | 1 + .../repository/repository-module-manager.js | 68 ++-- 8 files changed, 503 insertions(+), 191 deletions(-) create mode 100644 src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js create mode 100644 src/modules/repository/implementation/sequelize/models/paranet-asset.js create mode 100644 src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js diff --git a/src/commands/paranet/paranet-sync-command.js b/src/commands/paranet/paranet-sync-command.js index 07d619de47..6b86b17094 100644 --- a/src/commands/paranet/paranet-sync-command.js +++ b/src/commands/paranet/paranet-sync-command.js @@ -11,10 +11,10 @@ import { PARANET_SYNC_RETRY_DELAY_MS, OPERATION_STATUS, PARANET_NODES_ACCESS_POLICIES, - PARANET_SYNC_SOURCES, - TRIPLE_STORE_REPOSITORIES, - LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS, - LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY, + // PARANET_SYNC_SOURCES, + // TRIPLE_STORE_REPOSITORIES, + // LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS, + // LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY, } from '../../constants/constants.js'; class ParanetSyncCommand extends Command { @@ -38,7 +38,7 @@ class ParanetSyncCommand extends Command { PARANET_NODES_ACCESS_POLICIES[paranetMetadata.nodesAccessPolicy]; this.logger.info( - `Paranet sync: Starting paranet sync for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}`, + `Paranet sync: Starting paranet sync for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}, access policy ${paranetNodesAccessPolicy}`, ); // Fetch counts from blockchain and database @@ -50,20 +50,12 @@ class ParanetSyncCommand extends Command { ).toNumber(); const syncedAssetsCount = - await this.repositoryModuleManager.getParanetSyncedAssetRecordsCountByDataSource( - paranetUAL, - PARANET_SYNC_SOURCES.SYNC, - ); - const localStoredAssetsCount = - await this.repositoryModuleManager.getParanetSyncedAssetRecordsCountByDataSource( - paranetUAL, - PARANET_SYNC_SOURCES.LOCAL_STORE, - ); + await this.repositoryModuleManager.getParanetSyncedAssetRecordsCount(paranetUAL); const totalMissedAssetsCount = await this.repositoryModuleManager.getCountOfMissedAssetsOfParanet(paranetUAL); const missedAssetsCount = - await this.repositoryModuleManager.getFilteredCountOfMissedAssetsOfParanet( + await this.repositoryModuleManager.getMissedParanetAssetsRecordsWithRetryCount( paranetUAL, PARANET_SYNC_RETRIES_LIMIT, PARANET_SYNC_RETRY_DELAY_MS, @@ -72,7 +64,7 @@ class ParanetSyncCommand extends Command { const paranetRepository = this.paranetService.getParanetRepositoryName(paranetUAL); this.logger.info( - `Paranet sync: Paranet: ${paranetUAL} (${paranetId}) Total count of Paranet KAs in the contract: ${contractKaCount}; Synced KAs count: ${syncedAssetsCount}; Local Stored KAs count: ${localStoredAssetsCount}; Total count of missed KAs: ${totalMissedAssetsCount}`, + `Paranet sync: Paranet: ${paranetUAL} (${paranetId}) Total count of Paranet KAs in the contract: ${contractKaCount}; Synced KAs count: ${syncedAssetsCount}; Total count of missed KAs: ${totalMissedAssetsCount}`, ); // First, attempt to sync missed KAs if any exist @@ -87,7 +79,7 @@ class ParanetSyncCommand extends Command { OPERATION_ID_STATUS.PARANET.PARANET_SYNC_MISSED_KAS_SYNC_START, ); - const [successulMissedSyncsCount, failedMissedSyncsCount] = await this.syncMissedKAs( + const [successfulMissedSyncsCount, failedMissedSyncsCount] = await this.syncMissedKAs( paranetUAL, paranetId, paranetNodesAccessPolicy, @@ -96,7 +88,7 @@ class ParanetSyncCommand extends Command { ); this.logger.info( - `Paranet sync: Successful missed assets syncs: ${successulMissedSyncsCount}; ` + + `Paranet sync: Successful missed assets syncs: ${successfulMissedSyncsCount}; ` + `Failed missed assets syncs: ${failedMissedSyncsCount} for paranet: ${paranetUAL} ` + `(${paranetId}), operation ID: ${operationId}!`, ); @@ -105,17 +97,16 @@ class ParanetSyncCommand extends Command { operationId, blockchain, OPERATION_ID_STATUS.PARANET.PARANET_SYNC_MISSED_KAS_SYNC_END, - successulMissedSyncsCount, + successfulMissedSyncsCount, failedMissedSyncsCount, ); } // Then, check for new KAs on the blockchain - if (syncedAssetsCount + localStoredAssetsCount + totalMissedAssetsCount < contractKaCount) { + if (syncedAssetsCount + totalMissedAssetsCount < contractKaCount) { this.logger.info( `Paranet sync: Syncing ${ - contractKaCount - - (syncedAssetsCount + localStoredAssetsCount + totalMissedAssetsCount) + contractKaCount - (syncedAssetsCount + totalMissedAssetsCount) } new assets for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}`, ); @@ -131,9 +122,9 @@ class ParanetSyncCommand extends Command { contractKaCount, paranetUAL, paranetId, - paranetNodesAccessPolicy, - paranetRepository, - operationId, + // paranetNodesAccessPolicy, + // paranetRepository, + // operationId, ); this.logger.info( @@ -204,27 +195,53 @@ class ParanetSyncCommand extends Command { this.logger.debug( `Paranet sync: Get for ${ual} with operation id ${getOperationId} initiated.`, ); - if (paranetNodesAccessPolicy === 'OPEN') { - await this.commandExecutor.add({ - name: 'networkGetCommand', - sequence: [], - delay: 0, - data: { - operationId: getOperationId, - id: ual, - blockchain, - contract, - tokenId, - state: assertionId, - assertionId, - paranetId, - paranetUAL, - }, - transactional: false, - }); - } else if (paranetNodesAccessPolicy === 'CURATED') { + + const maxAttempts = PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_MAX_ATTEMPTS; + const pollingInterval = PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_INTERVAL_MILLIS; + + let attempt = 0; + let getResult; + + await this.commandExecutor.add({ + name: 'localGetCommand', + sequence: [], + delay: 0, + data: { + operationId: getOperationId, + id: ual, + blockchain, + contract, + tokenId, + state: assertionId, + assertionId, + paranetId, + paranetUAL, + }, + transactional: false, + }); + + do { + await setTimeout(pollingInterval); + getResult = await this.operationIdService.getOperationIdRecord(getOperationId); + attempt += 1; + } while ( + attempt < maxAttempts && + getResult?.status !== OPERATION_ID_STATUS.FAILED && + getResult?.status !== OPERATION_ID_STATUS.COMPLETED + ); + + if (getResult?.status !== OPERATION_ID_STATUS.COMPLETED) { + this.logger.info( + `Local GET failed for tokenId: ${tokenId}, attempting network GET.`, + ); + + const networkCommandName = + paranetNodesAccessPolicy === 'OPEN' + ? 'networkGetCommand' + : 'curatedParanetNetworkGetCommand'; + await this.commandExecutor.add({ - name: 'curatedParanetNetworkGetCommand', + name: networkCommandName, sequence: [], delay: 0, data: { @@ -240,38 +257,31 @@ class ParanetSyncCommand extends Command { }, transactional: false, }); - } - await this.operationIdService.updateOperationIdStatus( - getOperationId, - blockchain, - OPERATION_ID_STATUS.GET.GET_INIT_END, - ); - - let attempt = 0; - let getResult; - do { - await setTimeout(PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_INTERVAL_MILLIS); - getResult = await this.operationIdService.getOperationIdRecord(getOperationId); - attempt += 1; - } while ( - attempt < PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_MAX_ATTEMPTS && - getResult?.status !== OPERATION_ID_STATUS.FAILED && - getResult?.status !== OPERATION_ID_STATUS.COMPLETED - ); + attempt = 0; + do { + await setTimeout(pollingInterval); + getResult = await this.operationIdService.getOperationIdRecord(getOperationId); + attempt += 1; + } while ( + attempt < maxAttempts && + getResult?.status !== OPERATION_ID_STATUS.FAILED && + getResult?.status !== OPERATION_ID_STATUS.COMPLETED + ); + } if (getResult?.status !== OPERATION_ID_STATUS.COMPLETED) { this.logger.warn( - `Paranet sync: Unable to sync tokenId: ${tokenId}, for contract: ${contract} state index: ${stateIndex} blockchain: ${blockchain}, GET result: ${JSON.stringify( + `Paranet sync: Unable to sync tokenId: ${tokenId}, for contract: ${contract}, state index: ${stateIndex}, blockchain: ${blockchain}, GET result: ${JSON.stringify( getResult, )}`, ); - await this.repositoryModuleManager.createMissedParanetAssetRecord({ - blockchainId: blockchain, + // TODO: if exists, increase retry count + await this.repositoryModuleManager.incrementRetriesForUalAndParanetUal( ual, - paranetUal: paranetUAL, - }); + paranetUAL, + ); return false; } @@ -281,53 +291,55 @@ class ParanetSyncCommand extends Command { `Paranet sync: ${data.assertion.length} nquads found for asset with ual: ${ual}, state index: ${stateIndex}, assertionId: ${assertionId}`, ); - let repository; - if (latestState) { - repository = paranetRepository; - } else if (paranetNodesAccessPolicy === 'OPEN') { - repository = TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY; - } else if (paranetNodesAccessPolicy === 'CURATED') { - repository = TRIPLE_STORE_REPOSITORIES.PRIVATE_HISTORY; - } else { - throw new Error('Unsupported access policy'); - } + const repository = paranetRepository; - await this.tripleStoreService.localStoreAsset( - repository, - assertionId, - data.assertion, - blockchain, - contract, - tokenId, - LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS, - LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY, - ); - if (paranetNodesAccessPolicy === 'CURATED' && data.privateAssertion) { - await this.tripleStoreService.localStoreAsset( - repository, - data.syncedAssetRecord.privateAssertionId, - data.privateAssertion, - blockchain, - contract, - tokenId, - ); + const assertions = [data.public, data.private]; + + const storePromises = []; + + for (const assertionData of assertions) { + if (assertionData?.assertion && assertionData?.assertionId) { + storePromises.push( + this.tripleStoreService.insertKnowledgeCollection( + repository, + ual, + assertionData.assertion, + ), + ); + } } - const privateAssertionId = - paranetNodesAccessPolicy === 'CURATED' - ? data.syncedAssetRecord?.privateAssertionId - : null; + + await Promise.all(storePromises); + + // this doesnt work for v8 + // await this.tripleStoreService.localStoreAsset( + // repository, + // assertionId, + // data.assertion, + // blockchain, + // contract, + // tokenId, + // LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS, + // LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY, + // ); + // if (paranetNodesAccessPolicy === 'CURATED' && data.privateAssertion) { + // await this.tripleStoreService.localStoreAsset( + // repository, + // data.syncedAssetRecord.privateAssertionId, + // data.privateAssertion, + // blockchain, + // contract, + // tokenId, + // ); + // } + // const privateAssertionId = + // paranetNodesAccessPolicy === 'CURATED' + // ? data.syncedAssetRecord?.privateAssertionId + // : null; await this.repositoryModuleManager.incrementParanetKaCount(paranetId, blockchain); - await this.repositoryModuleManager.createParanetSyncedAssetRecord( - blockchain, - ual, - paranetUAL, - assertionId, - privateAssertionId, - data.syncedAssetRecord?.sender, - data.syncedAssetRecord?.transactionHash, - PARANET_SYNC_SOURCES.SYNC, - ); + + await this.repositoryModuleManager.updateAssetToBeSynced(ual, paranetUAL); return true; } catch (error) { @@ -335,11 +347,8 @@ class ParanetSyncCommand extends Command { `Paranet sync: Unable to sync tokenId: ${tokenId}, for contract: ${contract} state index: ${stateIndex} blockchain: ${blockchain}, error: ${error}`, ); - await this.repositoryModuleManager.createMissedParanetAssetRecord({ - blockchainId: blockchain, - ual, - paranetUal: paranetUAL, - }); + // TODO: probably dont need to do anything here and just leave it unsynced, maybe increase retry count + await this.repositoryModuleManager.incrementRetriesForUalAndParanetUal(ual, paranetUAL); return false; } @@ -355,7 +364,6 @@ class ParanetSyncCommand extends Command { paranetNodesAccessPolicy, paranetRepository, operationId, - removeMissingAssetRecord = false, ) { try { this.logger.info( @@ -388,20 +396,17 @@ class ParanetSyncCommand extends Command { )); } - if (isSuccessful && removeMissingAssetRecord) { - await this.repositoryModuleManager.removeMissedParanetAssetRecordsByUAL(ual); - } + // if (isSuccessful && removeMissingAssetRecord) { + // await this.repositoryModuleManager.syncKnowledgeAssetsByUAL(ual); + // } return isSuccessful; } catch (error) { this.logger.warn( `Paranet sync: Failed to sync asset: ${ual} for paranet: ${paranetId}, error: ${error}`, ); - await this.repositoryModuleManager.createMissedParanetAssetRecord({ - blockchain, - ual, - paranetUAL, - }); + // TODO: increase retry count + await this.repositoryModuleManager.incrementRetriesForUalAndParanetUal(ual, paranetUAL); return false; } @@ -446,7 +451,6 @@ class ParanetSyncCommand extends Command { paranetNodesAccessPolicy, paranetRepository, operationId, - true, // removeMissingAssetRecord ); }); @@ -468,13 +472,13 @@ class ParanetSyncCommand extends Command { contractKaCount, paranetUAL, paranetId, - paranetNodesAccessPolicy, - paranetRepository, - operationId, + // paranetNodesAccessPolicy, + // paranetRepository, + // operationId, ) { let i = Number(startIndex); - const results = []; + // const results = []; while (i <= contractKaCount) { const nextKaArray = await this.blockchainModuleManager.getParanetKnowledgeCollectionsWithPagination( @@ -490,7 +494,7 @@ class ParanetSyncCommand extends Command { i += nextKaArray.length; - const filteredKAs = []; + // const filteredKAs = []; // NOTE: This could also be processed in parallel if needed for (const knowledgeAssetId of nextKaArray) { const { knowledgeAssetStorageContract, tokenId: knowledgeAssetTokenId } = @@ -505,6 +509,7 @@ class ParanetSyncCommand extends Command { knowledgeAssetTokenId, ); + // TODO: can do these two queries in one and just get if asset exists in table const isAlreadySynced = await this.repositoryModuleManager.paranetSyncedAssetRecordExists(ual); @@ -513,46 +518,59 @@ class ParanetSyncCommand extends Command { continue; } + // TODO: can do these two queries in one and just get if asset exists in table const isMissedAsset = - await this.repositoryModuleManager.missedParanetAssetRecordExists(ual); + await this.repositoryModuleManager.missedParanetAssetRecordExists( + ual, + paranetUAL, + ); // Skip missed KAs as they are synced in the other function if (isMissedAsset) { continue; } - filteredKAs.push([ + await this.repositoryModuleManager.createParanetAssetRecord({ + blockchainId: blockchain, ual, - blockchain, - knowledgeAssetStorageContract, - knowledgeAssetTokenId, - ]); - } + paranetUal: paranetUAL, + }); - if (filteredKAs.length > 0) { - const promises = filteredKAs.map( - ([syncKAUal, syncKABlockchain, syncKAContract, syncKATokenId]) => - this.syncAsset( - syncKAUal, - syncKABlockchain, - syncKAContract, - syncKATokenId, - paranetUAL, - paranetId, - paranetNodesAccessPolicy, - paranetRepository, - operationId, - false, // removeMissingAssetRecord - ), - ); + // so instead of pushing to filtered KAs and syncing + // just add them to DB as missed and let the other loop catch them? - const batchResults = await Promise.all(promises); - results.push(...batchResults); + // filteredKAs.push([ + // ual, + // blockchain, + // knowledgeAssetStorageContract, + // knowledgeAssetTokenId, + // ]); } + + // if (filteredKAs.length > 0) { + // const promises = filteredKAs.map( + // ([syncKAUal, syncKABlockchain, syncKAContract, syncKATokenId]) => + // this.syncAsset( + // syncKAUal, + // syncKABlockchain, + // syncKAContract, + // syncKATokenId, + // paranetUAL, + // paranetId, + // paranetNodesAccessPolicy, + // paranetRepository, + // operationId, + // false, + // ), + // ); + + // const batchResults = await Promise.all(promises); + // results.push(...batchResults); + // } } - const successfulCount = results.filter(Boolean).length; - return [successfulCount, results.length - successfulCount]; + // const successfulCount = results.filter(Boolean).length; + // return [successfulCount, results.length - successfulCount]; } /** diff --git a/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js b/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js new file mode 100644 index 0000000000..8949803f8f --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js @@ -0,0 +1,65 @@ +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.createTable('paranet_asset', { + id: { + type: Sequelize.INTEGER, + primaryKey: true, + autoIncrement: true, + }, + blockchain_id: { + type: Sequelize.STRING, + allowNull: false, + }, + ual: { + type: Sequelize.STRING, + allowNull: false, + }, + paranet_ual: { + type: Sequelize.STRING, + allowNull: false, + }, + public_assertion_id: { + type: Sequelize.STRING, + allowNull: true, + }, + private_assertion_id: { + type: Sequelize.STRING, + allowNull: true, + }, + sender: { + type: Sequelize.STRING, + allowNull: true, + }, + transaction_hash: { + type: Sequelize.STRING, + allowNull: true, + }, + error_message: { + type: Sequelize.TEXT, + allowNull: true, + }, + is_synced: { + type: Sequelize.BOOLEAN, + allowNull: false, + defaultValue: false, + }, + retries: { + allowNull: false, + type: Sequelize.INTEGER, + defaultValue: 0, + }, + created_at: { + allowNull: false, + type: Sequelize.DATE, + defaultValue: Sequelize.literal('NOW()'), + }, + updated_at: { + allowNull: false, + type: Sequelize.DATE, + defaultValue: Sequelize.literal('NOW()'), + }, + }); +} + +export async function down({ context: { queryInterface } }) { + await queryInterface.dropTable('paranet_assets'); +} diff --git a/src/modules/repository/implementation/sequelize/models/missed-paranet-asset.js b/src/modules/repository/implementation/sequelize/models/missed-paranet-asset.js index c7788955df..e7e1f088d3 100644 --- a/src/modules/repository/implementation/sequelize/models/missed-paranet-asset.js +++ b/src/modules/repository/implementation/sequelize/models/missed-paranet-asset.js @@ -1,3 +1,5 @@ +// NOT USED ANYMORE + export default (sequelize, DataTypes) => { const blockchain = sequelize.define( 'missed_paranet_asset', diff --git a/src/modules/repository/implementation/sequelize/models/paranet-asset.js b/src/modules/repository/implementation/sequelize/models/paranet-asset.js new file mode 100644 index 0000000000..1ad30e3db7 --- /dev/null +++ b/src/modules/repository/implementation/sequelize/models/paranet-asset.js @@ -0,0 +1,84 @@ +export default (sequelize, DataTypes) => { + const paranetAsset = sequelize.define( + 'paranet_asset', + { + id: { + autoIncrement: true, + primaryKey: true, + type: DataTypes.INTEGER, + }, + blockchainId: { + allowNull: false, + type: DataTypes.STRING, + }, + ual: { + allowNull: false, + type: DataTypes.STRING, + }, + paranetUal: { + allowNull: false, + type: DataTypes.STRING, + }, + publicAssertionId: { + allowNull: true, + type: DataTypes.STRING, + }, + privateAssertionId: { + allowNull: true, + type: DataTypes.STRING, + }, + sender: { + allowNull: true, + type: DataTypes.STRING, + }, + transactionHash: { + allowNull: true, + type: DataTypes.STRING, + }, + errorMessage: { + allowNull: true, + type: DataTypes.TEXT, + }, + isSynced: { + allowNull: false, + type: DataTypes.BOOLEAN, + defaultValue: false, + }, + retries: { + allowNull: false, + type: DataTypes.INTEGER, + defaultValue: 0, + }, + createdAt: { + type: DataTypes.DATE, + }, + updatedAt: { + type: DataTypes.DATE, + }, + }, + { + underscored: true, + indexes: [ + { + unique: true, + fields: ['ual', 'paranetUal'], // Composite unique constraint on `ual` and `paranetUal` + }, + { + fields: ['updatedAt', 'retries', 'isSynced'], + }, + { + fields: ['ual', 'paranetUal'], + }, + { + fields: ['isSynced', 'paranetUal'], + }, + ], + }, + ); + + paranetAsset.associate = () => { + // Define associations here if needed + }; + + return paranetAsset; +}; diff --git a/src/modules/repository/implementation/sequelize/models/paranet-synced-asset.js b/src/modules/repository/implementation/sequelize/models/paranet-synced-asset.js index 97eb6c0a90..45fc8cad0b 100644 --- a/src/modules/repository/implementation/sequelize/models/paranet-synced-asset.js +++ b/src/modules/repository/implementation/sequelize/models/paranet-synced-asset.js @@ -1,5 +1,6 @@ import { PARANET_SYNC_SOURCES } from '../../../../../constants/constants.js'; +// NOT USED ANYMORE export default (sequelize, DataTypes) => { const blockchain = sequelize.define( 'paranet_synced_asset', diff --git a/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js b/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js new file mode 100644 index 0000000000..d57dba5682 --- /dev/null +++ b/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js @@ -0,0 +1,139 @@ +import Sequelize from 'sequelize'; + +class ParanetAssetRepository { + constructor(models) { + this.sequelize = models.sequelize; + this.model = models.paranet_asset; + } + + async createParanetAssetRecord(missedParanetAsset, options) { + return this.model.create({ ...missedParanetAsset, isSynced: false }, options); + } + + async getCountOfMissedAssetsOfParanet(paranetUal, options = {}) { + return this.model.count({ + where: { + paranetUal, + isSynced: false, + }, + ...options, + }); + } + + async getParanetSyncedAssetRecordsCount(paranetUal, options = {}) { + return this.model.count({ + where: { + paranet_ual: paranetUal, + isSynced: true, + }, + ...options, + }); + } + + // TODO: remove + // async getFilteredCountOfMissedAssetsOfParanet( + // paranetUal, + // retryCountLimit, + // retryDelayInMs, + // options = {}, + // ) { + // const now = new Date(); + // const delayDate = new Date(now.getTime() - retryDelayInMs); + + // const records = await this.model.findAll({ + // where: { + // paranetUal, + // isSynced: false, // Only unsynced assets + // retries: { + // [Sequelize.Op.lt]: retryCountLimit, // Filter by retries count + // }, + // created_at: { + // [Sequelize.Op.lte]: delayDate, // Filter by created_at date + // }, + // }, + // ...options, + // }); + + // return records.length; // Return the count of matching records + // } + + async getMissedParanetAssetsRecordsWithRetryCount( + paranetUal, + retryCountLimit, + retryDelayInMs, + limit = null, + options = {}, + ) { + const now = new Date(); + const delayDate = new Date(now.getTime() - retryDelayInMs); + + const queryOptions = { + where: { + paranetUal, + isSynced: false, + retries: { + [Sequelize.Op.lt]: retryCountLimit, + }, + updated_at: { + [Sequelize.Op.lte]: delayDate, + }, + }, + ...options, + }; + + if (limit !== null) { + queryOptions.limit = limit; + } + + return this.model.findAll(queryOptions); + } + + async missedParanetAssetRecordExists(ual, paranetUal, options = {}) { + const missedParanetAssetRecord = await this.model.findOne({ + where: { ual, isSynced: false }, + ...options, + }); + + return !!missedParanetAssetRecord; + } + + async paranetSyncedAssetRecordExists(ual, paranetUal, options = {}) { + const paranetSyncedAssetRecord = await this.model.getParanetSyncedAssetRecordByUAL( + ual, + paranetUal, + options, + ); + + return !!paranetSyncedAssetRecord; + } + + async updateAssetToBeSynced(ual, paranetUal, options = {}) { + const [affectedRows] = await this.model.update( + { isSynced: true }, + { + where: { + ual, + paranetUal, + }, + ...options, + }, + ); + + return affectedRows; + } + + async incrementRetriesForUalAndParanetUal(ual, paranetUal, options = {}) { + const [affectedRows] = await this.model.increment('retries', { + by: 1, + where: { + ual, + paranetUal, + }, + ...options, + }); + + return affectedRows; + } +} + +export default ParanetAssetRepository; diff --git a/src/modules/repository/implementation/sequelize/repositories/paranet-synced-asset-repository.js b/src/modules/repository/implementation/sequelize/repositories/paranet-synced-asset-repository.js index c9cde1c422..ce00ecb414 100644 --- a/src/modules/repository/implementation/sequelize/repositories/paranet-synced-asset-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/paranet-synced-asset-repository.js @@ -1,3 +1,4 @@ +// DEPRECATED class ParanetSyncedAssetRepository { constructor(models) { this.sequelize = models.sequelize; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 92f4793043..55be3e8986 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -373,8 +373,8 @@ class RepositoryModuleManager extends BaseModuleManager { ); } - async createMissedParanetAssetRecord(missedParanetAssset, options = {}) { - return this.getRepository('missed_paranet_asset').createMissedParanetAssetRecord( + async createParanetAssetRecord(missedParanetAssset, options = {}) { + return this.getRepository('paranet_asset').createParanetAssetRecord( missedParanetAssset, options, ); @@ -388,8 +388,13 @@ class RepositoryModuleManager extends BaseModuleManager { } async missedParanetAssetRecordExists(ual, options = {}) { - return this.getRepository('missed_paranet_asset').missedParanetAssetRecordExists( + return this.getRepository('paranet_asset').missedParanetAssetRecordExists(ual, options); + } + + async incrementRetriesForUalAndParanetUal(ual, paranetUal, options = {}) { + return this.getRepository('paranet_asset').incrementRetriesForUalAndParanetUal( ual, + paranetUal, options, ); } @@ -408,9 +413,7 @@ class RepositoryModuleManager extends BaseModuleManager { limit = null, options = {}, ) { - return this.getRepository( - 'missed_paranet_asset', - ).getMissedParanetAssetsRecordsWithRetryCount( + return this.getRepository('paranet_asset').getMissedParanetAssetsRecordsWithRetryCount( paranetUal, retryCountLimit, retryDelayInMs, @@ -419,26 +422,27 @@ class RepositoryModuleManager extends BaseModuleManager { ); } - async getCountOfMissedAssetsOfParanet(ual, options = {}) { - return this.getRepository('missed_paranet_asset').getCountOfMissedAssetsOfParanet( - ual, + async getCountOfMissedAssetsOfParanet(paranetUal, options = {}) { + return this.getRepository('paranet_asset').getCountOfMissedAssetsOfParanet( + paranetUal, options, ); } - async getFilteredCountOfMissedAssetsOfParanet( - ual, - retryCountLimit, - retryDelayInMs, - options = {}, - ) { - return this.getRepository('missed_paranet_asset').getFilteredCountOfMissedAssetsOfParanet( - ual, - retryCountLimit, - retryDelayInMs, - options, - ); - } + // TODO: remove + // async getFilteredCountOfMissedAssetsOfParanet( + // ual, + // retryCountLimit, + // retryDelayInMs, + // options = {}, + // ) { + // return this.getRepository('paranet_asset').getFilteredCountOfMissedAssetsOfParanet( + // ual, + // retryCountLimit, + // retryDelayInMs, + // options, + // ); + // } async getParanetsBlockchains(options = {}) { return this.getRepository('paranet').getParanetsBlockchains(options); @@ -455,7 +459,7 @@ class RepositoryModuleManager extends BaseModuleManager { dataSource, options = {}, ) { - return this.getRepository('paranet_synced_asset').createParanetSyncedAssetRecord( + return this.getRepository('paranet_asset').createParanetSyncedAssetRecord( blockchainId, ual, paranetUal, @@ -469,23 +473,21 @@ class RepositoryModuleManager extends BaseModuleManager { } async getParanetSyncedAssetRecordByUAL(ual, options = {}) { - return this.getRepository('paranet_synced_asset').getParanetSyncedAssetRecordByUAL( - ual, - options, - ); + return this.getRepository('paranet_asset').getParanetSyncedAssetRecordByUAL(ual, options); } - async getParanetSyncedAssetRecordsCountByDataSource(paranetUal, dataSource, options = {}) { + async getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options = {}) { return this.getRepository( 'paranet_synced_asset', - ).getParanetSyncedAssetRecordsCountByDataSource(paranetUal, dataSource, options); + ).getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options); } async paranetSyncedAssetRecordExists(ual, options = {}) { - return this.getRepository('paranet_synced_asset').paranetSyncedAssetRecordExists( - ual, - options, - ); + return this.getRepository('paranet_asset').paranetSyncedAssetRecordExists(ual, options); + } + + async updateAssetToBeSynced(ual, asset, options = {}) { + return this.getRepository('paranet_asset').updateAssetToBeSynced(ual, asset, options); } async incrementParanetKaCount(paranetId, blockchainId, options = {}) { From a743e45abfe7b0be993f03e3adebf46716a9d711 Mon Sep 17 00:00:00 2001 From: aleksaelezovic Date: Fri, 10 Jan 2025 12:15:00 +0100 Subject: [PATCH 2/9] add more TODO comments --- .../repository/repository-module-manager.js | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 55be3e8986..df306f3294 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -380,12 +380,13 @@ class RepositoryModuleManager extends BaseModuleManager { ); } - async getMissedParanetAssetRecords(blockchainId, options = {}) { - return this.getRepository('missed_paranet_asset').getMissedParanetAssetRecords( - blockchainId, - options, - ); - } + // TODO: remove + // async getMissedParanetAssetRecords(blockchainId, options = {}) { + // return this.getRepository('missed_paranet_asset').getMissedParanetAssetRecords( + // blockchainId, + // options, + // ); + // } async missedParanetAssetRecordExists(ual, options = {}) { return this.getRepository('paranet_asset').missedParanetAssetRecordExists(ual, options); @@ -399,12 +400,13 @@ class RepositoryModuleManager extends BaseModuleManager { ); } - async removeMissedParanetAssetRecordsByUAL(ual, options = {}) { - return this.getRepository('missed_paranet_asset').removeMissedParanetAssetRecordsByUAL( - ual, - options, - ); - } + // TODO: remove + // async removeMissedParanetAssetRecordsByUAL(ual, options = {}) { + // return this.getRepository('missed_paranet_asset').removeMissedParanetAssetRecordsByUAL( + // ual, + // options, + // ); + // } async getMissedParanetAssetsRecordsWithRetryCount( paranetUal, @@ -459,6 +461,7 @@ class RepositoryModuleManager extends BaseModuleManager { dataSource, options = {}, ) { + // TODO: implement return this.getRepository('paranet_asset').createParanetSyncedAssetRecord( blockchainId, ual, @@ -473,14 +476,16 @@ class RepositoryModuleManager extends BaseModuleManager { } async getParanetSyncedAssetRecordByUAL(ual, options = {}) { + // TODO: implement return this.getRepository('paranet_asset').getParanetSyncedAssetRecordByUAL(ual, options); } - async getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options = {}) { - return this.getRepository( - 'paranet_synced_asset', - ).getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options); - } + // TODO: remove + // async getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options = {}) { + // return this.getRepository( + // 'paranet_synced_asset', + // ).getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options); + // } async paranetSyncedAssetRecordExists(ual, options = {}) { return this.getRepository('paranet_asset').paranetSyncedAssetRecordExists(ual, options); From 0d15166f22bf2b693156ca0e8e5c31ee5f219690 Mon Sep 17 00:00:00 2001 From: aleksaelezovic Date: Fri, 10 Jan 2025 12:27:06 +0100 Subject: [PATCH 3/9] fix; add getParanetSyncedAssetRecordByUAL function --- .../sequelize/repositories/paranet-asset-repository.js | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js b/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js index d57dba5682..0de405fbe0 100644 --- a/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js @@ -97,8 +97,15 @@ class ParanetAssetRepository { return !!missedParanetAssetRecord; } + async getParanetSyncedAssetRecordByUAL(ual, paranetUal, options = {}) { + return this.model.findOne({ + where: { ual, paranetUal, isSynced: true }, + ...options, + }); + } + async paranetSyncedAssetRecordExists(ual, paranetUal, options = {}) { - const paranetSyncedAssetRecord = await this.model.getParanetSyncedAssetRecordByUAL( + const paranetSyncedAssetRecord = await this.getParanetSyncedAssetRecordByUAL( ual, paranetUal, options, From cb3d0dc091de41918cdb6f58635787f0560a43c9 Mon Sep 17 00:00:00 2001 From: aleksaelezovic Date: Fri, 10 Jan 2025 12:27:12 +0100 Subject: [PATCH 4/9] fix todos --- src/modules/repository/repository-module-manager.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index df306f3294..7cd315ec47 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -450,6 +450,7 @@ class RepositoryModuleManager extends BaseModuleManager { return this.getRepository('paranet').getParanetsBlockchains(options); } + // TODO: check if used anywhere, remove async createParanetSyncedAssetRecord( blockchainId, ual, @@ -461,7 +462,6 @@ class RepositoryModuleManager extends BaseModuleManager { dataSource, options = {}, ) { - // TODO: implement return this.getRepository('paranet_asset').createParanetSyncedAssetRecord( blockchainId, ual, @@ -475,8 +475,8 @@ class RepositoryModuleManager extends BaseModuleManager { ); } + // TODO: check if used anywhere, remove async getParanetSyncedAssetRecordByUAL(ual, options = {}) { - // TODO: implement return this.getRepository('paranet_asset').getParanetSyncedAssetRecordByUAL(ual, options); } From ee5eb8942d295a2efb095a06a59e76c956bed17c Mon Sep 17 00:00:00 2001 From: aleksaelezovic Date: Fri, 10 Jan 2025 12:34:08 +0100 Subject: [PATCH 5/9] fix paranet asset repo & model --- .../migrations/20241215122200-create-paranet-assets.js | 6 +++++- .../implementation/sequelize/sequelize-repository.js | 10 ++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js b/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js index 8949803f8f..77572ea057 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js @@ -58,8 +58,12 @@ export async function up({ context: { queryInterface, Sequelize } }) { defaultValue: Sequelize.literal('NOW()'), }, }); + await queryInterface.addConstraint('paranet_asset', { + fields: ['ual', 'paranet_ual'], + type: 'unique', + }); } export async function down({ context: { queryInterface } }) { - await queryInterface.dropTable('paranet_assets'); + await queryInterface.dropTable('paranet_asset'); } diff --git a/src/modules/repository/implementation/sequelize/sequelize-repository.js b/src/modules/repository/implementation/sequelize/sequelize-repository.js index e72060f707..e0cfd12a31 100644 --- a/src/modules/repository/implementation/sequelize/sequelize-repository.js +++ b/src/modules/repository/implementation/sequelize/sequelize-repository.js @@ -15,9 +15,10 @@ import OperationResponseRepository from './repositories/operation-response.js'; import ShardRepository from './repositories/shard-repository.js'; import TokenRepository from './repositories/token-repository.js'; import UserRepository from './repositories/user-repository.js'; -import MissedParanetAssetRepository from './repositories/missed-paranet-asset-repository.js'; -import ParanetSyncedAssetRepository from './repositories/paranet-synced-asset-repository.js'; +// import MissedParanetAssetRepository from './repositories/missed-paranet-asset-repository.js'; +// import ParanetSyncedAssetRepository from './repositories/paranet-synced-asset-repository.js'; import FinalityStatusRepository from './repositories/finality-status-repository.js'; +import ParanetAssetRepository from './repositories/paranet-asset-repository.js'; const __dirname = fileURLToPath(new URL('.', import.meta.url)); @@ -38,8 +39,9 @@ class SequelizeRepository { command: new CommandRepository(this.models), event: new EventRepository(this.models), paranet: new ParanetRepository(this.models), - paranet_synced_asset: new ParanetSyncedAssetRepository(this.models), - missed_paranet_asset: new MissedParanetAssetRepository(this.models), + // paranet_synced_asset: new ParanetSyncedAssetRepository(this.models), + // missed_paranet_asset: new MissedParanetAssetRepository(this.models), + paranet_asset: new ParanetAssetRepository(this.models), operation_id: new OperationIdRepository(this.models), operation: new OperationRepository(this.models), operation_response: new OperationResponseRepository(this.models), From a3543dc3c84a8d6f4d9ae9d7c2bcb96844b6e063 Mon Sep 17 00:00:00 2001 From: aleksaelezovic Date: Tue, 14 Jan 2025 12:45:20 +0100 Subject: [PATCH 6/9] register getParanetKnowledgeCollectionCount mock, add TODO comments for deletion --- src/modules/blockchain/blockchain-module-manager.js | 8 ++++++++ .../blockchain/implementation/web3-service.js | 12 ++++++++++++ 2 files changed, 20 insertions(+) diff --git a/src/modules/blockchain/blockchain-module-manager.js b/src/modules/blockchain/blockchain-module-manager.js index 56395fd2b8..8751594af3 100644 --- a/src/modules/blockchain/blockchain-module-manager.js +++ b/src/modules/blockchain/blockchain-module-manager.js @@ -184,12 +184,20 @@ class BlockchainModuleManager extends BaseModuleManager { ]); } + async getParanetKnowledgeCollectionCount(blockchain, paranetId) { + return this.callImplementationFunction(blockchain, 'getParanetKnowledgeCollectionCount', [ + paranetId, + ]); + } + + // TODO: remove? async getParanetKnowledgeAssetsCount(blockchain, paranetId) { return this.callImplementationFunction(blockchain, 'getParanetKnowledgeAssetsCount', [ paranetId, ]); } + // TODO: remove? async getParanetKnowledgeAssetsWithPagination(blockchain, paranetId, offset, limit) { return this.callImplementationFunction( blockchain, diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index 34149490ef..db2d17de1d 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -1036,6 +1036,18 @@ class Web3Service { return blockTimestamp; } + async getParanetKnowledgeCollectionCount(paranetId) { + throw new Error(`Not implemented getParanetKnowledgeCollectionCount(${paranetId})`); + // TODO: implement + + // return this.callContractFunction( + // this.contracts.ParanetsRegistry, + // 'getKnowledgeAssetsCount', + // [paranetId], + // CONTRACTS.PARANETS_REGISTRY, + // ); + } + async getParanetKnowledgeAssetsCount(paranetId) { return this.callContractFunction( this.contracts.ParanetsRegistry, From d03752e1275fccb13bd096af0225456f05239172 Mon Sep 17 00:00:00 2001 From: aleksaelezovic Date: Tue, 14 Jan 2025 13:01:51 +0100 Subject: [PATCH 7/9] same for getParanetKnowledgeCollectionsWithPagination --- .../blockchain/blockchain-module-manager.js | 8 ++++++++ .../blockchain/implementation/web3-service.js | 14 ++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/modules/blockchain/blockchain-module-manager.js b/src/modules/blockchain/blockchain-module-manager.js index 8751594af3..fbe334ea56 100644 --- a/src/modules/blockchain/blockchain-module-manager.js +++ b/src/modules/blockchain/blockchain-module-manager.js @@ -190,6 +190,14 @@ class BlockchainModuleManager extends BaseModuleManager { ]); } + async getParanetKnowledgeCollectionsWithPagination(blockchain, paranetId, offset, limit) { + return this.callImplementationFunction( + blockchain, + 'getParanetKnowledgeCollectionsWithPagination', + [paranetId, offset, limit], + ); + } + // TODO: remove? async getParanetKnowledgeAssetsCount(blockchain, paranetId) { return this.callImplementationFunction(blockchain, 'getParanetKnowledgeAssetsCount', [ diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index db2d17de1d..77bb712c15 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -1048,6 +1048,20 @@ class Web3Service { // ); } + async getParanetKnowledgeCollectionsWithPagination(paranetId, offset, limit) { + throw new Error( + `Not implemented getParanetKnowledgeCollectionsWithPagination(${paranetId}, ${offset}, ${limit}})`, + ); + // TODO: implement + + // return this.callContractFunction( + // this.contracts.ParanetsRegistry, + // 'getKnowledgeAssetsWithPagination', + // [paranetId, offset, limit], + // CONTRACTS.PARANETS_REGISTRY, + // ); + } + async getParanetKnowledgeAssetsCount(paranetId) { return this.callContractFunction( this.contracts.ParanetsRegistry, From 2b93e5532b0b542a0045168f7d704395de5f578a Mon Sep 17 00:00:00 2001 From: aleksaelezovic Date: Tue, 14 Jan 2025 14:02:13 +0100 Subject: [PATCH 8/9] fix --- src/commands/paranet/paranet-sync-command.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/commands/paranet/paranet-sync-command.js b/src/commands/paranet/paranet-sync-command.js index 6b86b17094..10efe81d8d 100644 --- a/src/commands/paranet/paranet-sync-command.js +++ b/src/commands/paranet/paranet-sync-command.js @@ -54,12 +54,13 @@ class ParanetSyncCommand extends Command { const totalMissedAssetsCount = await this.repositoryModuleManager.getCountOfMissedAssetsOfParanet(paranetUAL); - const missedAssetsCount = - await this.repositoryModuleManager.getMissedParanetAssetsRecordsWithRetryCount( + const missedAssetsCount = await this.repositoryModuleManager + .getMissedParanetAssetsRecordsWithRetryCount( paranetUAL, PARANET_SYNC_RETRIES_LIMIT, PARANET_SYNC_RETRY_DELAY_MS, - ); + ) + .then((data) => data.length); const paranetRepository = this.paranetService.getParanetRepositoryName(paranetUAL); From 0abacda965a2e93c6d6bd5854374ae6137377b39 Mon Sep 17 00:00:00 2001 From: aleksaelezovic Date: Tue, 14 Jan 2025 15:09:51 +0100 Subject: [PATCH 9/9] deleted unneccessary fields for paranet asset model --- .../20241215122200-create-paranet-assets.js | 16 ---------------- .../sequelize/models/paranet-asset.js | 16 ---------------- 2 files changed, 32 deletions(-) diff --git a/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js b/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js index 77572ea057..7780cd23e2 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js @@ -17,22 +17,6 @@ export async function up({ context: { queryInterface, Sequelize } }) { type: Sequelize.STRING, allowNull: false, }, - public_assertion_id: { - type: Sequelize.STRING, - allowNull: true, - }, - private_assertion_id: { - type: Sequelize.STRING, - allowNull: true, - }, - sender: { - type: Sequelize.STRING, - allowNull: true, - }, - transaction_hash: { - type: Sequelize.STRING, - allowNull: true, - }, error_message: { type: Sequelize.TEXT, allowNull: true, diff --git a/src/modules/repository/implementation/sequelize/models/paranet-asset.js b/src/modules/repository/implementation/sequelize/models/paranet-asset.js index 1ad30e3db7..eeb82926d3 100644 --- a/src/modules/repository/implementation/sequelize/models/paranet-asset.js +++ b/src/modules/repository/implementation/sequelize/models/paranet-asset.js @@ -19,22 +19,6 @@ export default (sequelize, DataTypes) => { allowNull: false, type: DataTypes.STRING, }, - publicAssertionId: { - allowNull: true, - type: DataTypes.STRING, - }, - privateAssertionId: { - allowNull: true, - type: DataTypes.STRING, - }, - sender: { - allowNull: true, - type: DataTypes.STRING, - }, - transactionHash: { - allowNull: true, - type: DataTypes.STRING, - }, errorMessage: { allowNull: true, type: DataTypes.TEXT,