diff --git a/package-lock.json b/package-lock.json index 6b8897dbf8..3b5107d498 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.0.3", + "version": "6.0.3+hotfix.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.0.3", + "version": "6.0.3+hotfix.1", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index dfa506fb03..44c10e42a6 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.0.3", + "version": "6.0.3+hotfix.1", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/scripts/copy-assertions.js b/scripts/copy-assertions.js new file mode 100644 index 0000000000..a107f6a88b --- /dev/null +++ b/scripts/copy-assertions.js @@ -0,0 +1,154 @@ +import 'dotenv/config'; +import fs from 'fs-extra'; +import rc from 'rc'; +import appRootPath from 'app-root-path'; +import path from 'path'; +import { TRIPLE_STORE_REPOSITORIES, SCHEMA_CONTEXT } from '../src/constants/constants.js'; +import TripleStoreModuleManager from '../src/modules/triple-store/triple-store-module-manager.js'; +import DataService from '../src/service/data-service.js'; +import Logger from '../src/logger/logger.js'; + +const { readFile } = fs; +const generalConfig = JSON.parse(await readFile(path.join(appRootPath.path, 'config/config.json'))); +const pjson = JSON.parse(await readFile(path.join(appRootPath.path, 'package.json'))); + +const defaultConfig = generalConfig[process.env.NODE_ENV]; + +const config = rc(pjson.name, defaultConfig); +const logger = new Logger(config.loglevel); + +const tripleStoreModuleManager = new TripleStoreModuleManager({ config, logger }); +await tripleStoreModuleManager.initialize(); +const dataService = new DataService({ config, logger }); + +const repositoryImplementations = {}; +for (const implementationName of tripleStoreModuleManager.getImplementationNames()) { + for (const repository in tripleStoreModuleManager.getImplementation(implementationName).module + .repositories) { + repositoryImplementations[repository] = implementationName; + } +} + +const fromRepository = TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT; +const fromImplementation = repositoryImplementations[TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT]; +const fromRepositoryName = + tripleStoreModuleManager.getImplementation(fromImplementation).module.repositories[ + fromRepository + ].name; + +const toRepository = TRIPLE_STORE_REPOSITORIES.PRIVATE_CURRENT; +const toImplementation = repositoryImplementations[TRIPLE_STORE_REPOSITORIES.PRIVATE_CURRENT]; +const toRepositoryName = + tripleStoreModuleManager.getImplementation(toImplementation).module.repositories[toRepository] + .name; + +async function getAssertions(implementation, repository) { + const graphs = await tripleStoreModuleManager.select( + implementation, + repository, + `SELECT DISTINCT ?g + WHERE { + GRAPH ?g { ?s ?p ?o } + }`, + ); + + return (graphs ?? []).filter(({ g }) => g.startsWith('assertion:')).map(({ g }) => g); +} + +function logPercentage(index, max) { + const previousPercentage = (Math.max(0, index - 1) / max) * 100; + const currentPercentage = (index / max) * 100; + + if (Math.floor(currentPercentage) - Math.floor(previousPercentage) < 1) return; + + logger.debug(`Migration at ${Math.floor(currentPercentage * 10) / 10}%`); +} + +let toRepositoryAssertions = await getAssertions(toImplementation, toRepository); +logger.info( + `${toRepositoryAssertions.length} assertions found in ${toRepository} repository before migration`, +); + +logger.info( + `Starting to copy assertions from ${fromImplementation} repository ${fromRepository} with name ${fromRepositoryName} to repository ${toImplementation} repository ${toRepository} with name ${toRepositoryName}`, +); + +const fromRepositoryAssertions = await getAssertions(fromImplementation, fromRepository); +logger.info(`${fromRepositoryAssertions.length} assertions found in ${fromRepository}`); + +let completed = 0; +const copyAssertion = async (g) => { + if (!toRepositoryAssertions.includes(g)) { + let nquads; + try { + nquads = await tripleStoreModuleManager.construct( + fromImplementation, + fromRepository, + `PREFIX schema: <${SCHEMA_CONTEXT}> + CONSTRUCT { ?s ?p ?o } + WHERE { + { + GRAPH <${g}> + { + ?s ?p ?o . + } + } + }`, + ); + + nquads = await dataService.toNQuads(nquads, 'application/n-quads'); + } catch (error) { + logger.error( + `Error while getting assertion ${g.substring( + 'assertion:'.length, + )} from ${fromImplementation} repository ${fromRepository} with name ${fromRepositoryName}. Error: ${ + error.message + }`, + ); + process.exit(1); + } + + try { + await tripleStoreModuleManager.insertAssertion( + toImplementation, + toRepository, + g.substring('assertion:'.length), + nquads.join('\n'), + ); + } catch (error) { + logger.error( + `Error while inserting assertion ${g.substring( + 'assertion:'.length, + )} with nquads: ${nquads} in ${toImplementation} repository ${toRepository} with name ${toRepositoryName}. Error: ${ + error.message + }`, + ); + process.exit(1); + } + } + + completed += 1; + logPercentage(completed, fromRepositoryAssertions.length); +}; + +const start = Date.now(); +const concurrency = 10; +let promises = []; +for (let i = 0; i < fromRepositoryAssertions.length; i += 1) { + promises.push(copyAssertion(fromRepositoryAssertions[i])); + if (promises.length > concurrency) { + // eslint-disable-next-line no-await-in-loop + await Promise.all(promises); + promises = []; + } +} +await Promise.all(promises); + +const end = Date.now(); + +logger.info(`Migration completed! Lasted ${(end - start) / 1000} seconds.`); + +toRepositoryAssertions = await getAssertions(toImplementation, toRepository); +logger.info( + `${toRepositoryAssertions.length} assertions found in ${toRepository} repository after migration`, +); diff --git a/src/commands/local-store/local-store-command.js b/src/commands/local-store/local-store-command.js index 536cd45632..cb113b5f54 100644 --- a/src/commands/local-store/local-store-command.js +++ b/src/commands/local-store/local-store-command.js @@ -5,6 +5,7 @@ class LocalStoreCommand extends Command { constructor(ctx) { super(ctx); this.tripleStoreService = ctx.tripleStoreService; + this.operationIdService = ctx.operationIdService; this.errorType = ERROR_TYPE.LOCAL_STORE.LOCAL_STORE_ERROR; } @@ -12,13 +13,14 @@ class LocalStoreCommand extends Command { async execute(command) { const { operationId } = command.data; + let assertions = []; try { await this.operationIdService.updateOperationIdStatus( operationId, OPERATION_ID_STATUS.LOCAL_STORE.LOCAL_STORE_START, ); - const assertions = await this.operationIdService.getCachedOperationIdData(operationId); + assertions = await this.operationIdService.getCachedOperationIdData(operationId); await Promise.all( assertions.map(({ assertionId, assertion }) => @@ -43,9 +45,16 @@ class LocalStoreCommand extends Command { ); } catch (e) { await this.handleError(operationId, e.message, this.errorType, true); + return Command.empty(); } - return Command.empty(); + if (command?.sequence?.length) { + await this.operationIdService.cacheOperationIdData(operationId, { + assertion: assertions[0].assertion, + }); + } + + return this.continueSequence(command.data, command.sequence); } /** diff --git a/src/controllers/http-api/publish-http-api-controller.js b/src/controllers/http-api/publish-http-api-controller.js index c6f399efb2..d60f074d6e 100644 --- a/src/controllers/http-api/publish-http-api-controller.js +++ b/src/controllers/http-api/publish-http-api-controller.js @@ -31,22 +31,36 @@ class PublishController extends BaseController { const { assertion, assertionId, blockchain, contract, tokenId, hashFunctionId } = req.body; try { - await Promise.all([ - this.repositoryModuleManager.createOperationRecord( - this.operationService.getOperationName(), - operationId, - OPERATION_STATUS.IN_PROGRESS, - ), - this.operationIdService.cacheOperationIdData(operationId, { assertion }), - ]); + await this.repositoryModuleManager.createOperationRecord( + this.operationService.getOperationName(), + operationId, + OPERATION_STATUS.IN_PROGRESS, + ); this.logger.info( `Received asset with assertion id: ${assertionId}, blockchain: ${blockchain}, hub contract: ${contract}, token id: ${tokenId}`, ); + let commandSequence = []; + + if (req.body.localStore) { + commandSequence.push('localStoreCommand'); + await this.operationIdService.cacheOperationIdData(operationId, [ + { assertion, assertionId }, + ]); + } else { + await this.operationIdService.cacheOperationIdData(operationId, { assertion }); + } + + commandSequence = [ + ...commandSequence, + 'validateAssertionCommand', + 'networkPublishCommand', + ]; + await this.commandExecutor.add({ - name: 'validateAssertionCommand', - sequence: ['networkPublishCommand'], + name: commandSequence[0], + sequence: commandSequence.slice(1), delay: 0, period: 5000, retries: 3,