diff --git a/package-lock.json b/package-lock.json index 67bebf224..7ecb4f8b7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "8.0.0+hotfix.3", + "version": "8.0.0+hotfix.4", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "8.0.0+hotfix.3", + "version": "8.0.0+hotfix.4", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index 5d0449e3b..355baaef1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "8.0.0+hotfix.3", + "version": "8.0.0+hotfix.4", "description": "OTNode V8", "main": "index.js", "type": "module", diff --git a/src/commands/local-store/local-store-command.js b/src/commands/local-store/local-store-command.js index 37f2bd2a1..732fc4c4f 100644 --- a/src/commands/local-store/local-store-command.js +++ b/src/commands/local-store/local-store-command.js @@ -40,6 +40,7 @@ class LocalStoreCommand extends Command { tokenId, minimumNumberOfNodeReplications, batchSize, + nodePartOfShard, } = command.data; try { @@ -191,21 +192,26 @@ class LocalStoreCommand extends Command { return Command.empty(); } - + let v; + let r; + let s; + let vs; const identityId = await this.blockchainModuleManager.getIdentityId(blockchain); - const { v, r, s, vs } = await this.signatureService.signMessage( - blockchain, - datasetRoot, - ); - await this.signatureService.addSignatureToStorage( - NETWORK_SIGNATURES_FOLDER, - operationId, - identityId, - v, - r, - s, - vs, - ); + if (nodePartOfShard) { + ({ v, r, s, vs } = await this.signatureService.signMessage( + blockchain, + datasetRoot, + )); + await this.signatureService.addSignatureToStorage( + NETWORK_SIGNATURES_FOLDER, + operationId, + identityId, + v, + r, + s, + vs, + ); + } const { v: publisherNodeV, @@ -239,16 +245,24 @@ class LocalStoreCommand extends Command { batchSize: batchSizePar, minAckResponses, }; - - await this.operationService.processResponse( - { ...command, data: updatedData }, - OPERATION_REQUEST_STATUS.COMPLETED, - { - messageType: NETWORK_MESSAGE_TYPES.RESPONSES.ACK, - messageData: { identityId, v, r, s, vs }, - }, - null, - ); + if (nodePartOfShard) { + await this.operationService.processResponse( + { ...command, data: updatedData }, + OPERATION_REQUEST_STATUS.COMPLETED, + { + messageType: NETWORK_MESSAGE_TYPES.RESPONSES.ACK, + messageData: { identityId, v, r, s, vs }, + }, + null, + ); + } else { + await this.operationService.processResponse( + { ...command, data: updatedData }, + OPERATION_REQUEST_STATUS.FAILED, + {}, + 'Node is not part of the shard.', + ); + } } catch (e) { await this.handleError(operationId, blockchain, e.message, this.errorType, true); return Command.empty(); diff --git a/src/commands/protocols/common/find-shard-command.js b/src/commands/protocols/common/find-shard-command.js index 34a646216..8530e0314 100644 --- a/src/commands/protocols/common/find-shard-command.js +++ b/src/commands/protocols/common/find-shard-command.js @@ -114,6 +114,7 @@ class FindShardCommand extends Command { return this.continueSequence( { ...command.data, + nodePartOfShard, leftoverNodes: shardNodes, numberOfFoundNodes: shardNodes.length + (nodePartOfShard ? 1 : 0), }, diff --git a/src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-get-request-command.js b/src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-get-request-command.js index 3210c2267..b9057861f 100644 --- a/src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-get-request-command.js +++ b/src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-get-request-command.js @@ -37,6 +37,7 @@ class HandleGetRequestCommand extends HandleProtocolMessageCommand { ual, includeMetadata, isOperationV0, + isV6Contract, } = commandData; let { assertionId, knowledgeAssetId } = commandData; @@ -109,6 +110,7 @@ class HandleGetRequestCommand extends HandleProtocolMessageCommand { ); let assertionPromise; + let notMigrated = false; if (!assertionId) { assertionId = await this.tripleStoreService.getLatestAssertionId( @@ -123,39 +125,50 @@ class HandleGetRequestCommand extends HandleProtocolMessageCommand { if (assertionId) { // DO NOT RUN THIS IF !assertionId - assertionPromise = this.tripleStoreService - .getV6Assertion(TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, assertionId) - .then(async (result) => { - if (!result?.length) { - this.logger.info( - `No V6 assertion found for assertionId: ${assertionId}, falling back to V8 getAssertion`, - ); + assertionPromise = (async () => { + let result = null; + + if (!isOperationV0) { + result = await this.tripleStoreService.getAssertion( + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + TRIPLES_VISIBILITY.PUBLIC, + ); + } + + if (!result?.length) { + // eslint-disable-next-line no-await-in-loop + result = await this.tripleStoreService.getV6Assertion( + TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, + assertionId, + ); + if (result?.length && !isOperationV0) { + notMigrated = true; + } - const fallbackResult = await this.tripleStoreService.getAssertion( + if (!result?.length && isOperationV0) { + result = await this.tripleStoreService.getAssertion( blockchain, contract, knowledgeCollectionId, knowledgeAssetId, TRIPLES_VISIBILITY.PUBLIC, ); - - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.GET.GET_REMOTE_GET_ASSERTION_END, - operationId, - blockchain, - ); - - return fallbackResult; } + } - this.operationIdService.emitChangeEvent( - OPERATION_ID_STATUS.GET.GET_REMOTE_GET_ASSERTION_END, - operationId, - blockchain, - ); + this.operationIdService.emitChangeEvent( + OPERATION_ID_STATUS.GET.GET_REMOTE_GET_ASSERTION_END, + operationId, + blockchain, + ); - return result.split('\n').filter((res) => res.length > 0); - }); + return typeof result === 'string' + ? result.split('\n').filter((res) => res.length > 0) + : result; + })(); } else { if (!knowledgeAssetId) { try { @@ -223,9 +236,10 @@ class HandleGetRequestCommand extends HandleProtocolMessageCommand { const [assertion, metadata] = await Promise.all(promises); const responseData = { - assertion: isOperationV0 - ? [...(assertion.public ?? []), ...(assertion.private ?? [])] - : assertion, + assertion: + (isOperationV0 || notMigrated) && isV6Contract + ? [...(assertion.public ?? []), ...(assertion.private ?? [])] + : assertion, ...(includeMetadata && metadata && { metadata }), }; diff --git a/src/commands/protocols/get/sender/get-find-shard-command.js b/src/commands/protocols/get/sender/get-find-shard-command.js index f3f0e5842..7570fee48 100644 --- a/src/commands/protocols/get/sender/get-find-shard-command.js +++ b/src/commands/protocols/get/sender/get-find-shard-command.js @@ -21,10 +21,7 @@ class GetFindShardCommand extends FindShardCommand { // eslint-disable-next-line no-unused-vars getOperationCommandSequence(nodePartOfShard, commandData) { const sequence = []; - if (nodePartOfShard) { - sequence.push('localGetCommand'); - } - sequence.push('networkGetCommand'); + sequence.push('localGetCommand', 'networkGetCommand'); return sequence; } diff --git a/src/commands/protocols/get/sender/get-validate-asset-command.js b/src/commands/protocols/get/sender/get-validate-asset-command.js index 1b5423490..dc65a1025 100644 --- a/src/commands/protocols/get/sender/get-validate-asset-command.js +++ b/src/commands/protocols/get/sender/get-validate-asset-command.js @@ -1,10 +1,6 @@ import ValidateAssetCommand from '../../../common/validate-asset-command.js'; import Command from '../../../command.js'; -import { - OPERATION_ID_STATUS, - ERROR_TYPE, - OLD_CONTENT_STORAGE_MAP, -} from '../../../../constants/constants.js'; +import { OPERATION_ID_STATUS, ERROR_TYPE } from '../../../../constants/constants.js'; class GetValidateAssetCommand extends ValidateAssetCommand { constructor(ctx) { @@ -27,8 +23,15 @@ class GetValidateAssetCommand extends ValidateAssetCommand { * @param command */ async execute(command) { - const { operationId, blockchain, contract, knowledgeCollectionId, ual, isOperationV0 } = - command.data; + const { + operationId, + blockchain, + contract, + knowledgeCollectionId, + ual, + isOperationV0, + isV6Contract, + } = command.data; await this.operationIdService.updateOperationIdStatus( operationId, blockchain, @@ -53,13 +56,7 @@ class GetValidateAssetCommand extends ValidateAssetCommand { blockchain, ); // TODO: Update to validate knowledge asset index - // TODO: Use isOldContract as variable and pass it through with command.data since it's used - if ( - !isOperationV0 && - Object.values(OLD_CONTENT_STORAGE_MAP).every( - (ca) => !ca.toLowerCase().includes(contract.toLowerCase()), - ) - ) { + if (!isOperationV0 && !isV6Contract) { const isValidUal = await this.validationService.validateUal( blockchain, contract, diff --git a/src/commands/protocols/get/sender/local-get-command.js b/src/commands/protocols/get/sender/local-get-command.js index 48969419d..e8503e700 100644 --- a/src/commands/protocols/get/sender/local-get-command.js +++ b/src/commands/protocols/get/sender/local-get-command.js @@ -35,6 +35,7 @@ class LocalGetCommand extends Command { contentType, assertionId, isOperationV0, + isV6Contract, } = command.data; let { knowledgeAssetId } = command.data; await this.operationIdService.updateOperationIdStatus( @@ -103,23 +104,13 @@ class LocalGetCommand extends Command { ); let assertionPromise; + let notMigrated = false; if (assertionId) { assertionPromise = (async () => { let result = null; - for (const repository of [ - TRIPLE_STORE_REPOSITORIES.PRIVATE_CURRENT, - TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, - ]) { - // eslint-disable-next-line no-await-in-loop - result = await this.tripleStoreService.getV6Assertion(repository, assertionId); - if (result?.length) { - break; - } - } - - if (!result?.length) { + if (!isOperationV0) { result = await this.tripleStoreService.getAssertion( blockchain, contract, @@ -128,6 +119,35 @@ class LocalGetCommand extends Command { contentType, ); } + + if (!result?.length) { + for (const repository of [ + TRIPLE_STORE_REPOSITORIES.PRIVATE_CURRENT, + TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT, + ]) { + // eslint-disable-next-line no-await-in-loop + result = await this.tripleStoreService.getV6Assertion( + repository, + assertionId, + ); + if (result?.length) { + if (!isOperationV0) { + notMigrated = true; + } + break; + } + } + if (!result?.length && isOperationV0) { + result = await this.tripleStoreService.getAssertion( + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + contentType, + ); + } + } + this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.GET.GET_LOCAL_GET_ASSERTION_END, operationId, @@ -206,9 +226,10 @@ class LocalGetCommand extends Command { const [assertion, metadata] = await Promise.all(promises); const responseData = { - assertion: isOperationV0 - ? [...(assertion?.public ?? []), ...(assertion?.private ?? [])] - : assertion, + assertion: + (isOperationV0 || notMigrated) && isV6Contract + ? [...(assertion?.public ?? []), ...(assertion?.private ?? [])] + : assertion, ...(includeMetadata && metadata && { metadata }), }; diff --git a/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-request-command.js b/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-request-command.js index cf2e0cd52..d541816bd 100644 --- a/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-request-command.js +++ b/src/commands/protocols/get/sender/v1.0.0/v1-0-0-get-request-command.js @@ -7,7 +7,6 @@ import { OPERATION_STATUS, OPERATION_ID_STATUS, PRIVATE_HASH_SUBJECT_PREFIX, - OLD_CONTENT_STORAGE_MAP, } from '../../../../../constants/constants.js'; class GetRequestCommand extends ProtocolRequestCommand { @@ -52,6 +51,7 @@ class GetRequestCommand extends ProtocolRequestCommand { paranetId, isOperationV0, assertionId, + isV6Contract, } = command.data; return { @@ -65,16 +65,19 @@ class GetRequestCommand extends ProtocolRequestCommand { paranetId, isOperationV0, assertionId, + isV6Contract, }; } async handleAck(command, responseData) { - const { blockchain, contract, knowledgeCollectionId, knowledgeAssetId, isOperationV0 } = - command.data; - - const isOldContract = Object.values(OLD_CONTENT_STORAGE_MAP).some((ca) => - ca.toLowerCase().includes(contract.toLowerCase()), - ); + const { + blockchain, + contract, + knowledgeCollectionId, + knowledgeAssetId, + isOperationV0, + isV6Contract, + } = command.data; if (responseData?.assertion?.public) { // Only whole collection can be validated not particular KA @@ -100,7 +103,7 @@ class GetRequestCommand extends ProtocolRequestCommand { ...kcTools.groupNquadsBySubject(privateHashTriples, true), ); - if (!isOldContract) { + if (!isV6Contract) { try { await this.validationService.validateDatasetOnBlockchain( publicKnowledgeAssetsTriplesGrouped.map((t) => t.sort()).flat(), diff --git a/src/commands/protocols/publish/read-cached-publish-data-command.js b/src/commands/protocols/publish/read-cached-publish-data-command.js index 504a92bbc..d12827645 100644 --- a/src/commands/protocols/publish/read-cached-publish-data-command.js +++ b/src/commands/protocols/publish/read-cached-publish-data-command.js @@ -10,11 +10,7 @@ class ReadCachedPublishDataCommand extends Command { constructor(ctx) { super(ctx); this.ualService = ctx.ualService; - this.dataService = ctx.dataService; this.fileService = ctx.fileService; - this.repositoryModuleManager = ctx.repositoryModuleManager; - this.networkModuleManager = ctx.networkModuleManager; - this.errorType = ERROR_TYPE.STORE_ASSERTION_ERROR; } @@ -41,17 +37,6 @@ class ReadCachedPublishDataCommand extends Command { const ual = this.ualService.deriveUAL(blockchain, contractAddress, id); - const myPeerId = this.networkModuleManager.getPeerId().toB58String(); - if (cachedData.remotePeerId === myPeerId) { - await this.repositoryModuleManager.saveFinalityAck( - publishOperationId, - ual, - cachedData.remotePeerId, - ); - } else { - command.sequence.push('findPublisherNodeCommand', 'networkFinalityCommand'); - } - return this.continueSequence( { operationId, diff --git a/src/commands/protocols/publish/sender/publish-find-shard-command.js b/src/commands/protocols/publish/sender/publish-find-shard-command.js index 1d9bff3af..300e45cbb 100644 --- a/src/commands/protocols/publish/sender/publish-find-shard-command.js +++ b/src/commands/protocols/publish/sender/publish-find-shard-command.js @@ -24,7 +24,7 @@ class PublishFindShardCommand extends FindShardCommand { sequence.push('publishValidateAssetCommand'); } - if (nodePartOfShard && !commandData.isOperationV0) { + if (!commandData.isOperationV0) { sequence.push('localStoreCommand'); } else { sequence.push('networkPublishCommand'); diff --git a/src/commands/protocols/publish/store-assertion-command.js b/src/commands/protocols/publish/store-assertion-command.js index 2bc8e2b14..8f7298c0d 100644 --- a/src/commands/protocols/publish/store-assertion-command.js +++ b/src/commands/protocols/publish/store-assertion-command.js @@ -12,12 +12,21 @@ class StoreAssertionCommand extends Command { this.ualService = ctx.ualService; this.dataService = ctx.dataService; this.tripleStoreService = ctx.tripleStoreService; + this.networkModuleManager = ctx.networkModuleManager; + this.repositoryModuleManager = ctx.repositoryModuleManager; this.errorType = ERROR_TYPE.STORE_ASSERTION_ERROR; } async execute(command) { - const { operationId, ual, blockchain, assertion } = command.data; + const { + operationId, + ual, + blockchain, + assertion, + publishOperationId, + remotePeerId: publisherPeerId, + } = command.data; await this.operationIdService.updateOperationIdStatus( operationId, @@ -26,17 +35,28 @@ class StoreAssertionCommand extends Command { ); try { await this._insertAssertion(assertion, ual); + + await this.operationIdService.updateOperationIdStatus( + operationId, + blockchain, + OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_STORE_ASSERTION_END, + ); + + const myPeerId = this.networkModuleManager.getPeerId().toB58String(); + if (publisherPeerId === myPeerId) { + await this.repositoryModuleManager.saveFinalityAck( + publishOperationId, + ual, + publisherPeerId, + ); + } else { + command.sequence.push('findPublisherNodeCommand', 'networkFinalityCommand'); + } } catch (e) { await this.handleError(operationId, blockchain, e.message, this.errorType, true); return Command.empty(); // TODO: Should it end here or do a retry? } - await this.operationIdService.updateOperationIdStatus( - operationId, - blockchain, - OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_STORE_ASSERTION_END, - ); - return this.continueSequence(command.data, command.sequence); } diff --git a/src/constants/constants.js b/src/constants/constants.js index 0ae75657b..67491b9fd 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -1173,8 +1173,8 @@ export const LOCAL_INSERT_FOR_ASSET_SYNC_RETRY_DELAY = 1000; export const LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS = 5; export const LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY = 1000; -export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 5; -export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 10000; +export const MAX_RETRIES_READ_CACHED_PUBLISH_DATA = 10; +export const RETRY_DELAY_READ_CACHED_PUBLISH_DATA = 10 * 1000; export const TRIPLE_STORE_REPOSITORY = { DKG: 'dkg', @@ -1187,7 +1187,7 @@ export const TRIPLES_VISIBILITY = { ALL: 'all', }; -export const OLD_CONTENT_STORAGE_MAP = { +export const V6_CONTENT_STORAGE_MAP = { BASE_MAINNET: '0x3bdfA81079B2bA53a25a6641608E5E1E6c464597', BASE_TESTNET: '0x9e3071Dc0730CB6dd0ce42969396D716Ea33E7e1', BASE_DEVNET: '0xBe08A25dcF2B68af88501611e5456571f50327B4', diff --git a/src/controllers/http-api/v0/get-http-api-controller-v0.js b/src/controllers/http-api/v0/get-http-api-controller-v0.js index 39c1f370c..3f8385e7d 100644 --- a/src/controllers/http-api/v0/get-http-api-controller-v0.js +++ b/src/controllers/http-api/v0/get-http-api-controller-v0.js @@ -3,7 +3,7 @@ import { OPERATION_STATUS, ERROR_TYPE, TRIPLES_VISIBILITY, - OLD_CONTENT_STORAGE_MAP, + V6_CONTENT_STORAGE_MAP, } from '../../../constants/constants.js'; import BaseController from '../base-http-api-controller.js'; @@ -65,14 +65,13 @@ class GetController extends BaseController { // Get assertionId - datasetRoot // + const isV6Contract = Object.values(V6_CONTENT_STORAGE_MAP).some((ca) => + ca.toLowerCase().includes(contract.toLowerCase()), + ); + const commandSequence = []; - if ( - !tripleStoreMigrationAlreadyExecuted && - Object.values(OLD_CONTENT_STORAGE_MAP) - .map((ca) => ca.toLowerCase()) - .includes(contract.toLowerCase()) - ) { + if (!tripleStoreMigrationAlreadyExecuted && isV6Contract) { commandSequence.push('getAssertionMerkleRootCommand'); } @@ -91,6 +90,7 @@ class GetController extends BaseController { knowledgeAssetId, operationId, paranetUAL, + isV6Contract, contentType: contentType ?? TRIPLES_VISIBILITY.ALL, isOperationV0: true, }, diff --git a/src/controllers/http-api/v1/get-http-api-controller-v1.js b/src/controllers/http-api/v1/get-http-api-controller-v1.js index 1993bca28..d139c5bdb 100644 --- a/src/controllers/http-api/v1/get-http-api-controller-v1.js +++ b/src/controllers/http-api/v1/get-http-api-controller-v1.js @@ -3,7 +3,7 @@ import { OPERATION_STATUS, ERROR_TYPE, TRIPLES_VISIBILITY, - OLD_CONTENT_STORAGE_MAP, + V6_CONTENT_STORAGE_MAP, } from '../../../constants/constants.js'; import BaseController from '../base-http-api-controller.js'; @@ -66,15 +66,14 @@ class GetController extends BaseController { // Get assertionId - datasetRoot // + const isV6Contract = Object.values(V6_CONTENT_STORAGE_MAP).some((ca) => + ca.toLowerCase().includes(contract.toLowerCase()), + ); + const commandSequence = []; commandSequence.push('getValidateAssetCommand'); - if ( - !tripleStoreMigrationAlreadyExecuted && - Object.values(OLD_CONTENT_STORAGE_MAP) - .map((ca) => ca.toLowerCase()) - .includes(contract.toLowerCase()) - ) { + if (!tripleStoreMigrationAlreadyExecuted && isV6Contract) { commandSequence.push('getAssertionMerkleRootCommand'); } @@ -93,6 +92,7 @@ class GetController extends BaseController { knowledgeAssetId, operationId, paranetUAL, + isV6Contract, contentType: contentType ?? TRIPLES_VISIBILITY.ALL, }, transactional: false, diff --git a/src/controllers/rpc/get-rpc-controller.js b/src/controllers/rpc/get-rpc-controller.js index 32b242b84..b04812eb9 100644 --- a/src/controllers/rpc/get-rpc-controller.js +++ b/src/controllers/rpc/get-rpc-controller.js @@ -39,6 +39,7 @@ class GetController extends BaseController { paranetId: message.data.paranetId, isOperationV0: message.data.isOperationV0, assertionId: message.data.assertionId, + isV6Contract: message.data.isV6Contract, }, transactional: false, }); diff --git a/src/modules/repository/implementation/sequelize/migrations/20240924161700-create-paranet-synced-asset.js b/src/modules/repository/implementation/sequelize/migrations/20240924161700-create-paranet-synced-asset.js index 3c7138611..f84a5bbf6 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20240924161700-create-paranet-synced-asset.js +++ b/src/modules/repository/implementation/sequelize/migrations/20240924161700-create-paranet-synced-asset.js @@ -45,34 +45,63 @@ export const up = async ({ context: { queryInterface, Sequelize } }) => { }, }); - await queryInterface.sequelize.query(` - CREATE TRIGGER before_insert_paranet_synced_asset - BEFORE INSERT ON paranet_synced_asset - FOR EACH ROW - SET NEW.created_at = NOW(); + const [triggerInsertExists] = await queryInterface.sequelize.query(` + SELECT COUNT(*) AS trigger_exists + FROM information_schema.triggers + WHERE trigger_schema = DATABASE() + AND trigger_name = 'before_insert_paranet_synced_asset'; `); + if (triggerInsertExists[0].trigger_exists === 0) { + await queryInterface.sequelize.query(` + CREATE TRIGGER before_insert_paranet_synced_asset + BEFORE INSERT ON paranet_synced_asset + FOR EACH ROW + BEGIN + SET NEW.created_at = NOW(); + END; + `); + } - await queryInterface.sequelize.query(` - CREATE TRIGGER before_update_paranet_synced_asset - BEFORE UPDATE ON paranet_synced_asset - FOR EACH ROW - SET NEW.updated_at = NOW(); + const [triggerUpdateExists] = await queryInterface.sequelize.query(` + SELECT COUNT(*) AS trigger_exists + FROM information_schema.triggers + WHERE trigger_schema = DATABASE() + AND trigger_name = 'before_update_paranet_synced_asset'; `); + if (triggerUpdateExists[0].trigger_exists === 0) { + await queryInterface.sequelize.query(` + CREATE TRIGGER before_update_paranet_synced_asset + BEFORE UPDATE ON paranet_synced_asset + FOR EACH ROW + BEGIN + SET NEW.updated_at = NOW(); + END; + `); + } - await queryInterface.sequelize.query(` - CREATE INDEX idx_paranet_ual_created_at - ON paranet_synced_asset (paranet_ual, created_at); - `); + const indexes = [ + { name: 'idx_paranet_ual_created_at', columns: '(paranet_ual, created_at)' }, + { name: 'idx_sender', columns: '(sender)' }, + { name: 'idx_paranet_ual_unique', columns: '(paranet_ual)' }, + ]; - await queryInterface.sequelize.query(` - CREATE INDEX idx_sender - ON paranet_synced_asset (sender); - `); - - await queryInterface.sequelize.query(` - CREATE INDEX idx_paranet_ual_unique - ON paranet_synced_asset (paranet_ual); - `); + for (const index of indexes) { + // eslint-disable-next-line no-await-in-loop + const [indexExists] = await queryInterface.sequelize.query(` + SELECT COUNT(*) AS index_exists + FROM information_schema.statistics + WHERE table_schema = DATABASE() + AND table_name = 'paranet_synced_asset' + AND index_name = '${index.name}'; + `); + if (indexExists[0].index_exists === 0) { + // eslint-disable-next-line no-await-in-loop + await queryInterface.sequelize.query(` + CREATE INDEX ${index.name} + ON paranet_synced_asset ${index.columns}; + `); + } + } }; export const down = async ({ context: { queryInterface } }) => { @@ -82,7 +111,6 @@ export const down = async ({ context: { queryInterface } }) => { DROP TRIGGER IF EXISTS before_insert_paranet_synced_asset; `); - // Delete the before-update trigger await queryInterface.sequelize.query(` DROP TRIGGER IF EXISTS before_update_paranet_synced_asset; `); diff --git a/src/modules/repository/implementation/sequelize/migrations/20241105160000-add-indexes-to-tables.js b/src/modules/repository/implementation/sequelize/migrations/20241105160000-add-indexes-to-tables.js index 047788728..249128004 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241105160000-add-indexes-to-tables.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241105160000-add-indexes-to-tables.js @@ -74,79 +74,74 @@ export async function up({ context: { queryInterface } }) { const { table, column, name } = index; // eslint-disable-next-line no-await-in-loop - const [results] = await queryInterface.sequelize.query(` - SELECT COUNT(1) AS count - FROM INFORMATION_SCHEMA.STATISTICS - WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = '${table}' AND INDEX_NAME = '${name}'; + const [indexExists] = await queryInterface.sequelize.query(` + SELECT COUNT(*) AS index_exists + FROM information_schema.statistics + WHERE table_schema = DATABASE() + AND table_name = '${table}' + AND index_name = '${name}'; `); - - if (results[0].count === 0) { + if (indexExists[0].index_exists === 0) { // eslint-disable-next-line no-await-in-loop - await queryInterface.addIndex(table, column, { name }); + await queryInterface.sequelize.query(` + CREATE INDEX \`${name}\` + ON \`${table}\` (${column.map((col) => `\`${col}\``).join(', ')}); + `); } } } export async function down({ context: { queryInterface } }) { - await queryInterface.removeIndex('shard', 'shard_blockchain_id_index'); - - await queryInterface.removeIndex('shard', 'last_dialed_index'); - - await queryInterface.removeIndex('paranet_synced_asset', 'paranet_synced_asset_ual_index'); - - await queryInterface.removeIndex('paranet_synced_asset', 'paranet_ual_data_source_index'); - - await queryInterface.removeIndex('paranet', 'blockchain_id_paranet_id_index'); - - await queryInterface.removeIndex('missed_paranet_asset', 'paranet_ual_index'); - - await queryInterface.removeIndex('missed_paranet_asset', 'missed_paranet_asset_ual_index'); - - await queryInterface.removeIndex('event', 'name_timestamp_index'); - - await queryInterface.removeIndex('event', 'event_operation_id_index'); - - await queryInterface.removeIndex('commands', 'name_status_index'); - - await queryInterface.removeIndex('commands', 'status_started_at_index'); - - await queryInterface.removeIndex('get', 'get_operation_id_index'); - - await queryInterface.removeIndex('publish', 'publish_operation_id_index'); - - await queryInterface.removeIndex('update', 'update_operation_id_index'); - - await queryInterface.removeIndex('publish_paranet', 'publish_paranet_operation_id_index'); - - await queryInterface.removeIndex('get', 'get_created_at_index'); - - await queryInterface.removeIndex('publish', 'publish_created_at_index'); - - await queryInterface.removeIndex('update', 'update_created_at_index'); - - await queryInterface.removeIndex('publish_paranet', 'publish_paranet_created_at_index'); - - await queryInterface.removeIndex('get_response', 'get_response_operation_id_index'); - - await queryInterface.removeIndex('publish_response', 'publish_response_operation_id_index'); - - await queryInterface.removeIndex('update_response', 'update_response_operation_id_index'); - - await queryInterface.removeIndex( - 'publish_paranet_response', - 'publish_paranet_response_operation_id_index', - ); - - await queryInterface.removeIndex('get_response', 'get_response_created_at_index'); - - await queryInterface.removeIndex('publish_response', 'publish_response_created_at_index'); - - await queryInterface.removeIndex('update_response', 'update_response_created_at_index'); + const indexes = [ + { table: 'shard', name: 'shard_blockchain_id_index' }, + { table: 'shard', name: 'last_dialed_index' }, + { table: 'paranet_synced_asset', name: 'paranet_synced_asset_ual_index' }, + { table: 'paranet_synced_asset', name: 'paranet_ual_data_source_index' }, + { table: 'paranet', name: 'blockchain_id_paranet_id_index' }, + { table: 'missed_paranet_asset', name: 'paranet_ual_index' }, + { table: 'missed_paranet_asset', name: 'missed_paranet_asset_ual_index' }, + { table: 'event', name: 'name_timestamp_index' }, + { table: 'event', name: 'event_operation_id_index' }, + { table: 'commands', name: 'name_status_index' }, + { table: 'commands', name: 'status_started_at_index' }, + { table: 'get', name: 'get_operation_id_index' }, + { table: 'publish', name: 'publish_operation_id_index' }, + { table: 'update', name: 'update_operation_id_index' }, + { table: 'publish_paranet', name: 'publish_paranet_operation_id_index' }, + { table: 'get', name: 'get_created_at_index' }, + { table: 'publish', name: 'publish_created_at_index' }, + { table: 'update', name: 'update_created_at_index' }, + { table: 'publish_paranet', name: 'publish_paranet_created_at_index' }, + { table: 'get_response', name: 'get_response_operation_id_index' }, + { table: 'publish_response', name: 'publish_response_operation_id_index' }, + { table: 'update_response', name: 'update_response_operation_id_index' }, + { + table: 'publish_paranet_response', + name: 'publish_paranet_response_operation_id_index', + }, + { table: 'get_response', name: 'get_response_created_at_index' }, + { table: 'publish_response', name: 'publish_response_created_at_index' }, + { table: 'update_response', name: 'update_response_created_at_index' }, + { + table: 'publish_paranet_response', + name: 'publish_paranet_response_created_at_index', + }, + { table: 'blockchain', name: 'contract_index' }, + ]; - await queryInterface.removeIndex( - 'publish_paranet_response', - 'publish_paranet_response_created_at_index', - ); + for (const { table, name } of indexes) { + // eslint-disable-next-line no-await-in-loop + const [indexExists] = await queryInterface.sequelize.query(` + SELECT COUNT(*) AS index_exists + FROM information_schema.statistics + WHERE table_schema = DATABASE() + AND table_name = '${table}' + AND index_name = '${name}'; + `); - await queryInterface.removeIndex('blockchain', 'contract_index'); + if (indexExists[0].index_exists > 0) { + // eslint-disable-next-line no-await-in-loop + await queryInterface.removeIndex(table, name); + } + } } diff --git a/src/modules/repository/implementation/sequelize/migrations/20241125151200-rename-keyword-column-to-datasetroot-in-responses.js b/src/modules/repository/implementation/sequelize/migrations/20241125151200-rename-keyword-column-to-datasetroot-in-responses.js index 0ac4d26f1..7f8046cbf 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241125151200-rename-keyword-column-to-datasetroot-in-responses.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241125151200-rename-keyword-column-to-datasetroot-in-responses.js @@ -1,9 +1,30 @@ export async function up({ context: { queryInterface } }) { - await queryInterface.renameColumn('publish_response', 'keyword', 'dataset_root'); - await queryInterface.renameColumn('get_response', 'keyword', 'dataset_root'); + // Helper function to check if a column exists + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (await columnExists('publish_response', 'keyword')) { + await queryInterface.renameColumn('publish_response', 'keyword', 'dataset_root'); + } + + if (await columnExists('get_response', 'keyword')) { + await queryInterface.renameColumn('get_response', 'keyword', 'dataset_root'); + } } export async function down({ context: { queryInterface } }) { - await queryInterface.renameColumn('publish_response', 'dataset_root', 'keyword'); - await queryInterface.renameColumn('get_response', 'dataset_root', 'keyword'); + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (await columnExists('publish_response', 'dataset_root')) { + await queryInterface.renameColumn('publish_response', 'dataset_root', 'keyword'); + } + + if (await columnExists('get_response', 'dataset_root')) { + await queryInterface.renameColumn('get_response', 'dataset_root', 'keyword'); + } } diff --git a/src/modules/repository/implementation/sequelize/migrations/20241126114400-add-commands-priority.js b/src/modules/repository/implementation/sequelize/migrations/20241126114400-add-commands-priority.js index 09e969fb8..d397c5235 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241126114400-add-commands-priority.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241126114400-add-commands-priority.js @@ -1,9 +1,23 @@ export async function up({ context: { queryInterface, Sequelize } }) { - await queryInterface.addColumn('commands', 'priority', { - type: Sequelize.BIGINT, - }); + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (!(await columnExists('commands', 'priority'))) { + await queryInterface.addColumn('commands', 'priority', { + type: Sequelize.BIGINT, + }); + } } export async function down({ context: { queryInterface } }) { - await queryInterface.removeColumn('commands', 'priority'); + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (await columnExists('commands', 'priority')) { + await queryInterface.removeColumn('commands', 'priority'); + } } diff --git a/src/modules/repository/implementation/sequelize/migrations/20241129120000-add-commands-is_blocking.js b/src/modules/repository/implementation/sequelize/migrations/20241129120000-add-commands-is_blocking.js index 16c78763b..7b47ca2cc 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241129120000-add-commands-is_blocking.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241129120000-add-commands-is_blocking.js @@ -1,9 +1,23 @@ export async function up({ context: { queryInterface, Sequelize } }) { - await queryInterface.addColumn('commands', 'is_blocking', { - type: Sequelize.BOOLEAN, - }); + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (!(await columnExists('commands', 'is_blocking'))) { + await queryInterface.addColumn('commands', 'is_blocking', { + type: Sequelize.BOOLEAN, + }); + } } export async function down({ context: { queryInterface } }) { - await queryInterface.removeColumn('commands', 'is_blocking'); + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (await columnExists('commands', 'is_blocking')) { + await queryInterface.removeColumn('commands', 'is_blocking'); + } } diff --git a/src/modules/repository/implementation/sequelize/migrations/20241129125800-remove-datasetroot-response-table.js b/src/modules/repository/implementation/sequelize/migrations/20241129125800-remove-datasetroot-response-table.js index d80ed68fa..c667a26a5 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241129125800-remove-datasetroot-response-table.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241129125800-remove-datasetroot-response-table.js @@ -1,15 +1,35 @@ export async function up({ context: { queryInterface } }) { - await queryInterface.removeColumn('publish_response', 'dataset_root'); - await queryInterface.removeColumn('get_response', 'dataset_root'); + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (await columnExists('publish_response', 'dataset_root')) { + await queryInterface.removeColumn('publish_response', 'dataset_root'); + } + + if (await columnExists('get_response', 'dataset_root')) { + await queryInterface.removeColumn('get_response', 'dataset_root'); + } } export async function down({ context: { queryInterface, Sequelize } }) { - await queryInterface.addColumn('publish_response', 'dataset_root', { - type: Sequelize.STRING, - allowNull: false, - }); - await queryInterface.addColumn('get_response', 'dataset_root', { - type: Sequelize.STRING, - allowNull: false, - }); + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (!(await columnExists('publish_response', 'dataset_root'))) { + await queryInterface.addColumn('publish_response', 'dataset_root', { + type: Sequelize.STRING, + allowNull: false, + }); + } + + if (!(await columnExists('get_response', 'dataset_root'))) { + await queryInterface.addColumn('get_response', 'dataset_root', { + type: Sequelize.STRING, + allowNull: false, + }); + } } diff --git a/src/modules/repository/implementation/sequelize/migrations/20241201152000-update-blockchain-events.js b/src/modules/repository/implementation/sequelize/migrations/20241201152000-update-blockchain-events.js index d4909d2ca..4109dd20b 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241201152000-update-blockchain-events.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241201152000-update-blockchain-events.js @@ -1,37 +1,70 @@ export async function up({ context: { queryInterface, Sequelize } }) { - await queryInterface.renameColumn('blockchain_event', 'blockchain_id', 'blockchain'); + // Helper function to check if a column exists + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } - await queryInterface.changeColumn('blockchain_event', 'block', { - type: Sequelize.BIGINT, - }); + if (await columnExists('blockchain_event', 'blockchain_id')) { + await queryInterface.renameColumn('blockchain_event', 'blockchain_id', 'blockchain'); + } - await queryInterface.renameColumn('blockchain_event', 'block', 'block_number'); + if (await columnExists('blockchain_event', 'block')) { + await queryInterface.changeColumn('blockchain_event', 'block', { + type: Sequelize.BIGINT, + }); - await queryInterface.addColumn('blockchain_event', 'transaction_index', { - type: Sequelize.BIGINT, - }); + await queryInterface.renameColumn('blockchain_event', 'block', 'block_number'); + } - await queryInterface.addColumn('blockchain_event', 'log_index', { - type: Sequelize.BIGINT, - }); + if (!(await columnExists('blockchain_event', 'transaction_index'))) { + await queryInterface.addColumn('blockchain_event', 'transaction_index', { + type: Sequelize.BIGINT, + }); + } - await queryInterface.addColumn('blockchain_event', 'contract_address', { - type: Sequelize.STRING, - }); + if (!(await columnExists('blockchain_event', 'log_index'))) { + await queryInterface.addColumn('blockchain_event', 'log_index', { + type: Sequelize.BIGINT, + }); + } + + if (!(await columnExists('blockchain_event', 'contract_address'))) { + await queryInterface.addColumn('blockchain_event', 'contract_address', { + type: Sequelize.STRING, + }); + } } export async function down({ context: { queryInterface, Sequelize } }) { - await queryInterface.renameColumn('blockchain_event', 'block_number', 'block'); + async function columnExists(table, column) { + const tableDescription = await queryInterface.describeTable(table); + return Object.prototype.hasOwnProperty.call(tableDescription, column); + } + + if (await columnExists('blockchain_event', 'block_number')) { + await queryInterface.renameColumn('blockchain_event', 'block_number', 'block'); + } - await queryInterface.changeColumn('blockchain_event', 'block', { - type: Sequelize.INTEGER, - }); + if (await columnExists('blockchain_event', 'block')) { + await queryInterface.changeColumn('blockchain_event', 'block', { + type: Sequelize.INTEGER, + }); + } - await queryInterface.renameColumn('blockchain_event', 'blockchain', 'blockchain_id'); + if (await columnExists('blockchain_event', 'blockchain')) { + await queryInterface.renameColumn('blockchain_event', 'blockchain', 'blockchain_id'); + } - await queryInterface.removeColumn('blockchain_event', 'transaction_index'); + if (await columnExists('blockchain_event', 'transaction_index')) { + await queryInterface.removeColumn('blockchain_event', 'transaction_index'); + } - await queryInterface.removeColumn('blockchain_event', 'log_index'); + if (await columnExists('blockchain_event', 'log_index')) { + await queryInterface.removeColumn('blockchain_event', 'log_index'); + } - await queryInterface.removeColumn('blockchain_event', 'contract_address'); + if (await columnExists('blockchain_event', 'contract_address')) { + await queryInterface.removeColumn('blockchain_event', 'contract_address'); + } } diff --git a/src/modules/repository/implementation/sequelize/migrations/20241211204400-rename-ask.js b/src/modules/repository/implementation/sequelize/migrations/20241211204400-rename-ask.js index 954f87a84..801966fa1 100644 --- a/src/modules/repository/implementation/sequelize/migrations/20241211204400-rename-ask.js +++ b/src/modules/repository/implementation/sequelize/migrations/20241211204400-rename-ask.js @@ -1,9 +1,39 @@ export async function up({ context: { queryInterface } }) { - await queryInterface.renameTable('finality', 'ask'); - await queryInterface.renameTable('finality_response', 'ask_response'); + async function tableExists(table) { + const [results] = await queryInterface.sequelize.query(` + SELECT COUNT(*) AS table_exists + FROM information_schema.tables + WHERE table_schema = DATABASE() + AND table_name = '${table}'; + `); + return results[0].table_exists > 0; + } + + if (await tableExists('finality')) { + await queryInterface.renameTable('finality', 'ask'); + } + + if (await tableExists('finality_response')) { + await queryInterface.renameTable('finality_response', 'ask_response'); + } } export async function down({ context: { queryInterface } }) { - await queryInterface.renameTable('ask', 'finality'); - await queryInterface.renameTable('ask_response', 'finality_response'); + async function tableExists(table) { + const [results] = await queryInterface.sequelize.query(` + SELECT COUNT(*) AS table_exists + FROM information_schema.tables + WHERE table_schema = DATABASE() + AND table_name = '${table}'; + `); + return results[0].table_exists > 0; + } + + if (await tableExists('ask')) { + await queryInterface.renameTable('ask', 'finality'); + } + + if (await tableExists('ask_response')) { + await queryInterface.renameTable('ask_response', 'finality_response'); + } } diff --git a/src/modules/triple-store/implementation/ot-triple-store.js b/src/modules/triple-store/implementation/ot-triple-store.js index 53c89af9c..82b133475 100644 --- a/src/modules/triple-store/implementation/ot-triple-store.js +++ b/src/modules/triple-store/implementation/ot-triple-store.js @@ -68,9 +68,13 @@ class OtTripleStore { type: 'sparql', value: this.repositories[repository].sparqlEndpointUpdate, }, + httpTimeout: 60_000, + httpBodyTimeout: true, }; this.repositories[repository].queryContext = { sources, + httpTimeout: 60_000, + httpBodyTimeout: true, }; } } diff --git a/src/service/triple-store-service.js b/src/service/triple-store-service.js index 8ab9214ac..59daaea1c 100644 --- a/src/service/triple-store-service.js +++ b/src/service/triple-store-service.js @@ -37,7 +37,7 @@ class TripleStoreService { repository, knowledgeCollectionUAL, triples, - retries = 1, + retries = 5, retryDelay = 0, ) { this.logger.info( diff --git a/v8-data-migration/constants.js b/v8-data-migration/constants.js index 53f84f4ef..32b10c37f 100644 --- a/v8-data-migration/constants.js +++ b/v8-data-migration/constants.js @@ -20,12 +20,13 @@ export const VISIBILITY = { }; export const BATCH_SIZE = 50; -export const DEFAULT_CONFIG_PATH = '/root/ot-node/current/config/config.json'; -export const NODERC_CONFIG_PATH = '/root/ot-node/.origintrail_noderc'; -export const DATA_MIGRATION_DIR = '/root/ot-node/data/data-migration'; -export const LOG_DIR = '/root/ot-node/data/data-migration/logs'; -export const ENV_PATH = '/root/ot-node/current/.env'; -export const MIGRATION_DIR = '/root/ot-node/data/migrations/'; +export const MAIN_DIR = '/root'; +export const DEFAULT_CONFIG_PATH = `${MAIN_DIR}/ot-node/current/config/config.json`; +export const NODERC_CONFIG_PATH = `${MAIN_DIR}/ot-node/.origintrail_noderc`; +export const DATA_MIGRATION_DIR = `${MAIN_DIR}/ot-node/data/data-migration`; +export const LOG_DIR = `${MAIN_DIR}/ot-node/data/data-migration/logs`; +export const ENV_PATH = `${MAIN_DIR}/ot-node/current/.env`; +export const MIGRATION_DIR = `${MAIN_DIR}/ot-node/data/migrations/`; export const MIGRATION_PROGRESS_FILE = 'v8DataMigration'; export const DB_URLS = { diff --git a/v8-data-migration/run-data-migration.sh b/v8-data-migration/run-data-migration.sh index 3b8d1c243..9e46cab41 100644 --- a/v8-data-migration/run-data-migration.sh +++ b/v8-data-migration/run-data-migration.sh @@ -1,3 +1,5 @@ -cd /root/ot-node/current/v8-data-migration/ && +MAIN_DIR=/root + +cd $MAIN_DIR/ot-node/current/v8-data-migration/ && npm rebuild sqlite3 && -nohup node v8-data-migration.js > /root/ot-node/data/nohup.out 2>&1 & \ No newline at end of file +nohup node v8-data-migration.js > $MAIN_DIR/ot-node/data/nohup.out 2>&1 & \ No newline at end of file diff --git a/v8-data-migration/sqlite-utils.js b/v8-data-migration/sqlite-utils.js index 3bc682fb2..d710a4c77 100644 --- a/v8-data-migration/sqlite-utils.js +++ b/v8-data-migration/sqlite-utils.js @@ -27,6 +27,23 @@ export class SqliteDatabase { } } + async checkIntegrity() { + this._validateConnection(); + + try { + const result = await this.db.get('PRAGMA integrity_check;'); + if (result.integrity_check === 'ok') { + logger.info('Database integrity check passed.'); + return true; + } + logger.error('Database integrity check failed:', result.integrity_check); + return false; + } catch (error) { + logger.error('Error during integrity check:', error.message); + return false; + } + } + async getTableExists(blockchainName) { this._validateConnection(); this._validateBlockchainName(blockchainName); diff --git a/v8-data-migration/v8-data-migration-utils.js b/v8-data-migration/v8-data-migration-utils.js index 2a8f1731c..ff0e97155 100644 --- a/v8-data-migration/v8-data-migration-utils.js +++ b/v8-data-migration/v8-data-migration-utils.js @@ -73,3 +73,17 @@ export function markMigrationAsSuccessfull() { // close file fs.closeSync(file); } + +export function deleteFile(filePath) { + if (fs.existsSync(filePath)) { + fs.unlinkSync(filePath); + logger.info(`Deleted file: ${filePath}`); + + if (fs.existsSync(filePath)) { + logger.error(`File: ${filePath} still exists after deletion.`); + process.exit(1); + } + } else { + logger.info(`Did not delete file: ${filePath} because it does not exist.`); + } +} diff --git a/v8-data-migration/v8-data-migration.js b/v8-data-migration/v8-data-migration.js index 7dff4b6b4..82437c6fb 100644 --- a/v8-data-migration/v8-data-migration.js +++ b/v8-data-migration/v8-data-migration.js @@ -10,6 +10,7 @@ import { ensureDirectoryExists, ensureMigrationProgressFileExists, markMigrationAsSuccessfull, + deleteFile, } from './v8-data-migration-utils.js'; import { getAssertionFromV6TripleStore, @@ -263,6 +264,69 @@ async function getAssertionsInBatch( return v6Assertions; } +async function downloadDb(dbFilePath) { + logger.time(`Database file downloading time`); + const maxAttempts = 3; + for (let i = 0; i < maxAttempts; i += 1) { + // Fetch the db file from the remote server + logger.info( + `Fetching ${process.env.NODE_ENV}.db file from ${DB_URLS[process.env.NODE_ENV]}. Try ${ + i + 1 + } of 3. This may take a while...`, + ); + + try { + const writer = fs.createWriteStream(dbFilePath); + const response = await axios({ + url: DB_URLS[process.env.NODE_ENV], + method: 'GET', + responseType: 'stream', + }); + + // Pipe the response stream to the file + response.data.pipe(writer); + + await new Promise((resolve, reject) => { + let downloadComplete = false; + + response.data.on('end', () => { + downloadComplete = true; + }); + + writer.on('finish', resolve); + writer.on('error', (err) => + reject(new Error(`Write stream error: ${err.message}`)), + ); + response.data.on('error', (err) => + reject(new Error(`Download stream error: ${err.message}`)), + ); + response.data.on('close', () => { + if (!downloadComplete) { + reject(new Error('Download stream closed before completing')); + } + }); + }); + if (fs.existsSync(dbFilePath)) { + logger.info(`DB file downloaded successfully`); + break; + } + logger.error(`DB file for ${process.env.NODE_ENV} is not present after download.`); + } catch (error) { + logger.error(`Error downloading DB file: ${error.message}`); + } + + logger.info('Deleting downloaded db file to prevent data corruption'); + deleteFile(dbFilePath); + + if (i === maxAttempts - 1) { + logger.error('Max db download attempts reached. Terminating process...'); + process.exit(1); + } + logger.info(`Retrying db download...`); + } + logger.timeEnd(`Database file downloading time`); +} + async function main() { ensureMigrationProgressFileExists(); @@ -318,66 +382,37 @@ async function main() { // Ensure connections await ensureConnections(tripleStoreRepositories, tripleStoreImplementation); - // Check if db exists and if it doesn't download it to the relevant directory - const dbFilePath = path.join(DATA_MIGRATION_DIR, `${process.env.NODE_ENV}.db`); - if (!fs.existsSync(dbFilePath)) { - logger.info( - `DB file for ${process.env.NODE_ENV} does not exist in ${DATA_MIGRATION_DIR}. Downloading it...`, - ); - // Fetch the db file from the remote server - logger.info( - `Fetching ${process.env.NODE_ENV}.db file from ${ - DB_URLS[process.env.NODE_ENV] - }. This may take a while...`, - ); - logger.time(`Database file downloading time`); - const writer = fs.createWriteStream(dbFilePath); - const response = await axios({ - url: DB_URLS[process.env.NODE_ENV], - method: 'GET', - responseType: 'stream', - }); - - // Pipe the response stream to the file - response.data.pipe(writer); - // Wait for the file to finish downloading - try { - await new Promise((resolve, reject) => { - let downloadComplete = false; + const maxAttempts = 2; + for (let i = 0; i < maxAttempts; i += 1) { + // Check if db exists and if it doesn't download it to the relevant directory + const dbFilePath = path.join(DATA_MIGRATION_DIR, `${process.env.NODE_ENV}.db`); + if (!fs.existsSync(dbFilePath)) { + logger.info( + `DB file for ${process.env.NODE_ENV} does not exist in ${DATA_MIGRATION_DIR}. Downloading it...`, + ); + await downloadDb(dbFilePath); + } - response.data.on('end', () => { - downloadComplete = true; - }); + logger.info('Initializing SQLite database'); + await sqliteDb.initialize(); - writer.on('finish', resolve); - writer.on('error', (err) => - reject(new Error(`Write stream error: ${err.message}`)), - ); - response.data.on('error', (err) => - reject(new Error(`Download stream error: ${err.message}`)), - ); - response.data.on('close', () => { - if (!downloadComplete) { - reject(new Error('Download stream closed before completing')); - } - }); - }); - } catch (error) { - logger.error(`Critical error during download: ${error.message}`); - logger.error('Terminating process to prevent data corruption'); - process.exit(1); - } - logger.timeEnd(`Database file downloading time`); + // Check if db is corrupted and handle accordingly + const integrityCheck = await sqliteDb.checkIntegrity(); + if (!integrityCheck) { + await sqliteDb.close(); + logger.info('Db integrity check failed. Deleting corrupt db file.'); + deleteFile(dbFilePath); - if (!fs.existsSync(dbFilePath)) { - throw new Error(`DB file for ${process.env.NODE_ENV} could not be created.`); + if (i === maxAttempts - 1) { + logger.error('Db integrity check failed. Terminating process...'); + process.exit(1); + } + logger.info(`Retrying db download and integrity check...`); + continue; } + break; } - // Initialize SQLite database once before processing blockchains - logger.info('Initializing SQLite database'); - await sqliteDb.initialize(); - try { // make sure blockchains are always migrated in this order - base, gnosis, neuroweb const sortedBlockchains = Object.keys(blockchainConfig.implementation).sort();