-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathot-node.js
395 lines (347 loc) · 16.7 KB
/
ot-node.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
import DeepExtend from 'deep-extend';
import rc from 'rc';
import EventEmitter from 'events';
import { createRequire } from 'module';
import { execSync } from 'child_process';
import DependencyInjection from './src/service/dependency-injection.js';
import Logger from './src/logger/logger.js';
import { MIN_NODE_VERSION, PARANET_ACCESS_POLICY } from './src/constants/constants.js';
import FileService from './src/service/file-service.js';
import OtnodeUpdateCommand from './src/commands/common/otnode-update-command.js';
import OtAutoUpdater from './src/modules/auto-updater/implementation/ot-auto-updater.js';
import MigrationExecutor from './src/migration/migration-executor.js';
const require = createRequire(import.meta.url);
const { setTimeout } = require('timers/promises');
const pjson = require('./package.json');
const configjson = require('./config/config.json');
class OTNode {
constructor(config) {
this.initializeConfiguration(config);
this.initializeLogger();
this.initializeFileService();
this.initializeAutoUpdaterModule();
this.checkNodeVersion();
}
async start() {
await this.checkForUpdate();
await this.removeUpdateFile();
await MigrationExecutor.executeTripleStoreUserConfigurationMigration(
this.container,
this.logger,
this.config,
);
this.logger.info('██████╗ ██╗ ██╗ ██████╗ ██╗ ██╗ █████╗ ');
this.logger.info('██╔══██╗██║ ██╔╝██╔════╝ ██║ ██║██╔══██╗');
this.logger.info('██║ ██║█████╔╝ ██║ ███╗ ██║ ██║╚█████╔╝');
this.logger.info('██║ ██║██╔═██╗ ██║ ██║ ╚██╗ ██╔╝██╔══██╗');
this.logger.info('██████╔╝██║ ██╗╚██████╔╝ ╚████╔╝ ╚█████╔╝');
this.logger.info('╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ╚═══╝ ╚════╝ ');
this.logger.info('======================================================');
this.logger.info(` OriginTrail Node v${pjson.version}`);
this.logger.info('======================================================');
this.logger.info(`Node is running in ${process.env.NODE_ENV} environment`);
await this.initializeDependencyContainer();
this.initializeEventEmitter();
await this.initializeModules();
this.initializeBlockchainEventsService();
await this.initializeShardingTableService();
await this.initializeParanets();
await this.createProfiles();
await this.initializeCommandExecutor();
await this.initializeRouters();
await this.startNetworkModule();
this.resumeCommandExecutor();
this.logger.info('Node is up and running!');
}
checkNodeVersion() {
const nodeMajorVersion = process.versions.node.split('.')[0];
this.logger.warn('======================================================');
this.logger.warn(`Using node.js version: ${process.versions.node}`);
if (nodeMajorVersion < MIN_NODE_VERSION) {
this.logger.warn(
`This node was tested with node.js version 16. To make sure that your node is running properly please update your node version!`,
);
}
this.logger.warn('======================================================');
}
initializeLogger() {
this.logger = new Logger(this.config.logLevel);
}
initializeFileService() {
this.fileService = new FileService({ config: this.config, logger: this.logger });
}
initializeAutoUpdaterModule() {
this.autoUpdaterModuleManager = new OtAutoUpdater();
this.autoUpdaterModuleManager.initialize(
this.config.modules.autoUpdater.implementation['ot-auto-updater'].config,
this.logger,
);
}
initializeConfiguration(userConfig) {
const defaultConfig = JSON.parse(JSON.stringify(configjson[process.env.NODE_ENV]));
if (userConfig) {
this.config = DeepExtend(defaultConfig, userConfig);
} else {
this.config = rc(pjson.name, defaultConfig);
}
if (!this.config.configFilename) {
// set default user configuration filename
this.config.configFilename = '.origintrail_noderc';
}
}
async initializeDependencyContainer() {
this.container = await DependencyInjection.initialize();
DependencyInjection.registerValue(this.container, 'config', this.config);
DependencyInjection.registerValue(this.container, 'logger', this.logger);
this.logger.info('Dependency injection module is initialized');
}
async initializeModules() {
const initializationPromises = [];
for (const moduleName in this.config.modules) {
const moduleManagerName = `${moduleName}ModuleManager`;
const moduleManager = this.container.resolve(moduleManagerName);
initializationPromises.push(moduleManager.initialize());
}
try {
await Promise.all(initializationPromises);
this.logger.info(`All modules initialized!`);
} catch (e) {
this.logger.error(`Module initialization failed. Error message: ${e.message}`);
this.stop(1);
}
}
initializeEventEmitter() {
const eventEmitter = new EventEmitter();
DependencyInjection.registerValue(this.container, 'eventEmitter', eventEmitter);
this.logger.info('Event emitter initialized');
}
async initializeRouters() {
try {
this.logger.info('Initializing http api and rpc router');
const routerNames = ['httpApiRouter', 'rpcRouter'];
await Promise.all(
routerNames.map(async (routerName) => {
const router = this.container.resolve(routerName);
try {
await router.initialize();
} catch (error) {
this.logger.error(
`${routerName} initialization failed. Error message: ${error.message}, ${error.stackTrace}`,
);
this.stop(1);
}
}),
);
this.logger.info('Routers initialized successfully');
} catch (error) {
this.logger.error(
`Failed to initialize routers: ${error.message}, ${error.stackTrace}`,
);
this.stop(1);
}
}
async createProfiles() {
const cryptoService = this.container.resolve('cryptoService');
const blockchainModuleManager = this.container.resolve('blockchainModuleManager');
const networkModuleManager = this.container.resolve('networkModuleManager');
const peerId = networkModuleManager.getPeerId().toB58String();
const createProfilesPromises = blockchainModuleManager
.getImplementationNames()
.map(async (blockchain) => {
try {
const identityExists = await blockchainModuleManager.identityIdExists(
blockchain,
);
if (!identityExists) {
this.logger.info(`Creating profile on network: ${blockchain}`);
await blockchainModuleManager.createProfile(blockchain, peerId);
if (
process.env.NODE_ENV === 'development' ||
process.env.NODE_ENV === 'test'
) {
const blockchainConfig =
blockchainModuleManager.getModuleConfiguration(blockchain);
execSync(
`npm run set-stake -- --rpcEndpoint=${blockchainConfig.rpcEndpoints[0]} --stake=${blockchainConfig.initialStakeAmount} --operationalWalletPrivateKey=${blockchainConfig.operationalWallets[0].privateKey} --managementWalletPrivateKey=${blockchainConfig.evmManagementWalletPrivateKey} --hubContractAddress=${blockchainConfig.hubContractAddress}`,
{ stdio: 'inherit' },
);
await setTimeout(10000);
execSync(
`npm run set-ask -- --rpcEndpoint=${blockchainConfig.rpcEndpoints[0]} --ask=${blockchainConfig.initialAskAmount} --privateKey=${blockchainConfig.operationalWallets[0].privateKey} --hubContractAddress=${blockchainConfig.hubContractAddress}`,
{ stdio: 'inherit' },
);
}
}
const identityId = await blockchainModuleManager.getIdentityId(blockchain);
this.logger.info(`Identity ID: ${identityId}`);
if (identityExists) {
const onChainNodeId = await blockchainModuleManager.getNodeId(
blockchain,
identityId,
);
const onChainPeerId = cryptoService.convertHexToAscii(onChainNodeId);
if (peerId !== onChainPeerId) {
this.logger.warn(
`Local peer id: ${peerId} doesn't match on chain peer id: ${onChainPeerId} for blockchain: ${blockchain}, identity id: ${identityId}.`,
);
blockchainModuleManager.removeImplementation(blockchain);
}
}
} catch (error) {
this.logger.warn(
`Unable to create ${blockchain} blockchain profile. Removing implementation. Error: ${error.message}`,
);
blockchainModuleManager.removeImplementation(blockchain);
}
});
await Promise.all(createProfilesPromises);
if (!blockchainModuleManager.getImplementationNames().length) {
this.logger.error(`Unable to create blockchain profiles. OT-node shutting down...`);
this.stop(1);
}
}
async initializeCommandExecutor() {
try {
const commandExecutor = this.container.resolve('commandExecutor');
commandExecutor.pauseQueue();
await commandExecutor.addDefaultCommands();
commandExecutor
.replayOldCommands()
.then(() => this.logger.info('Finished replaying old commands'));
} catch (e) {
this.logger.error(
`Command executor initialization failed. Error message: ${e.message}`,
);
this.stop(1);
}
}
resumeCommandExecutor() {
try {
const commandExecutor = this.container.resolve('commandExecutor');
commandExecutor.resumeQueue();
} catch (e) {
this.logger.error(
`Unable to resume command executor queue. Error message: ${e.message}`,
);
this.stop(1);
}
}
async startNetworkModule() {
const networkModuleManager = this.container.resolve('networkModuleManager');
await networkModuleManager.start();
}
async initializeShardingTableService() {
try {
const shardingTableService = this.container.resolve('shardingTableService');
await shardingTableService.initialize();
this.logger.info('Sharding Table Service initialized successfully');
} catch (error) {
this.logger.error(
`Unable to initialize sharding table service. Error message: ${error.message} OT-node shutting down...`,
);
this.stop(1);
}
}
initializeBlockchainEventsService() {
try {
const blockchainEventsService = this.container.resolve('blockchainEventsService');
blockchainEventsService.initializeBlockchainEventsServices();
this.logger.info('Blockchain Events Service initialized successfully');
} catch (error) {
this.logger.error(
`Unable to initialize Blockchain Events Service. Error message: ${error.message} OT-node shutting down...`,
);
this.stop(1);
}
}
async removeUpdateFile() {
const updateFilePath = this.fileService.getUpdateFilePath();
await this.fileService.removeFile(updateFilePath).catch((error) => {
this.logger.warn(`Unable to remove update file. Error: ${error}`);
});
this.config.otNodeUpdated = true;
}
async checkForUpdate() {
const autoUpdaterCommand = new OtnodeUpdateCommand({
logger: this.logger,
config: this.config,
fileService: this.fileService,
autoUpdaterModuleManager: this.autoUpdaterModuleManager,
});
await autoUpdaterCommand.execute();
}
async initializeParanets() {
const blockchainModuleManager = this.container.resolve('blockchainModuleManager');
const tripleStoreService = this.container.resolve('tripleStoreService');
const tripleStoreModuleManager = this.container.resolve('tripleStoreModuleManager');
const paranetService = this.container.resolve('paranetService');
const ualService = this.container.resolve('ualService');
const validParanets = [];
const syncParanets =
this.config.assetSync && this.config.assetSync.syncParanets
? this.config.assetSync.syncParanets
: [];
for (const paranetUAL of syncParanets) {
if (!ualService.isUAL(paranetUAL)) {
this.logger.warn(
`Unable to initialize Paranet with id ${paranetUAL} because of invalid UAL format`,
);
continue;
}
const { blockchain, contract, knowledgeCollectionId } =
ualService.resolveUAL(paranetUAL);
if (!blockchainModuleManager.getImplementationNames().includes(blockchain)) {
this.logger.warn(
`Unable to initialize Paranet with id ${paranetUAL} because of unsupported blockchain implementation`,
);
continue;
}
const paranetId = paranetService.constructParanetId(contract, knowledgeCollectionId);
// eslint-disable-next-line no-await-in-loop
const paranetExists = await blockchainModuleManager.paranetExists(
blockchain,
paranetId,
);
if (!paranetExists) {
this.logger.warn(
`Unable to initialize Paranet with id ${paranetUAL} because it doesn't exist`,
);
continue;
}
// eslint-disable-next-line no-await-in-loop
const nodesAccessPolicy = await blockchainModuleManager.getNodesAccessPolicy(
blockchain,
paranetId,
);
if (nodesAccessPolicy === PARANET_ACCESS_POLICY.CURATED) {
// eslint-disable-next-line no-await-in-loop
const identityId = await blockchainModuleManager.getIdentityId(blockchain);
// eslint-disable-next-line no-await-in-loop
const isCuratedNode = await blockchainModuleManager.isCuratedNode(
blockchain,
paranetId,
identityId,
);
if (!isCuratedNode) {
this.logger.warn(
`Unable to initialize Paranet with id ${paranetUAL} because node with id ${identityId} is not a curated node`,
);
continue;
}
}
validParanets.push(paranetUAL);
const repository = paranetService.getParanetRepositoryName(paranetUAL);
// eslint-disable-next-line no-await-in-loop
await tripleStoreModuleManager.initializeParanetRepository(repository);
// eslint-disable-next-line no-await-in-loop
await paranetService.initializeParanetRecord(blockchain, paranetId);
}
this.config.assetSync.syncParanets = validParanets;
tripleStoreService.initializeRepositories();
}
stop(code = 0) {
this.logger.info('Stopping node...');
process.exit(code);
}
}
export default OTNode;