Skip to content

Commit

Permalink
Merge pull request #25 from digicatapult/feature/in-154
Browse files Browse the repository at this point in the history
Feature/in 154
  • Loading branch information
n3op2 authored May 9, 2022
2 parents b3c272d + 1bb34d9 commit e18f6d4
Show file tree
Hide file tree
Showing 18 changed files with 634 additions and 39 deletions.
2 changes: 2 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
"es6": true,
"node": true
},
"parser": "babel-eslint",
"parserOptions": {
"ecmaVersion": 9,
"sourceType": "module"
},
"ignorePatterns": ["**/test/__fixtures__/**/*"],
"rules": {
"prettier/prettier": "error",
"no-console": 2
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

Manages a go-ipfs instance maintaining the private network swarm key based on the value from a `dscp-node` instance.

## Local development
> install dependencies
```sh
npm i
```
> start substrate node using docker-compose
```sh
docker-compose up -d // -d for silent
```
> start ipfs nodejs wrapper
```sh
npm run dev
```

## Environment Variables

`dscp-ipfs` is configured primarily using environment variables as follows:
Expand Down
2 changes: 2 additions & 0 deletions app/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const vars = envalid.cleanEnv(
METADATA_KEY_LENGTH: envalid.num({ default: 32 }),
METADATA_VALUE_LITERAL_LENGTH: envalid.num({ default: 32 }),
PROCESS_IDENTIFIER_LENGTH: envalid.num({ default: 32 }),
HEALTHCHECK_POLL_PERIOD_MS: envalid.num({ default: 30 * 1000, devDefault: 1000 }),
HEALTHCHECK_TIMEOUT_MS: envalid.num({ default: 2 * 1000, devDefault: 1000 }),
},
{
strict: true,
Expand Down
22 changes: 22 additions & 0 deletions app/ipfs.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { spawn } = require('child_process')
const EventEmitter = require('events')

const { ConnectionError } = require('./utils/Errors')
const { IPFS_PATH, IPFS_EXECUTABLE, IPFS_ARGS, IPFS_LOG_LEVEL } = require('./env')
const logger = require('./logger')

Expand Down Expand Up @@ -51,6 +52,7 @@ async function setupIpfs() {
})

ipfs.on('close', unexpectedCloseListener)
that.ipfs = ipfs
},
stop: async () => {
logger.info('Stopping IPFS')
Expand Down Expand Up @@ -82,6 +84,26 @@ async function setupIpfs() {
return that
}

async function ipfsHealthCheack(api, name = 'ipfs') {
try {
if (!api || !api.pid) throw new ConnectionError({ name })
const { spawnfile, pid, killed } = api

return {
name,
status: 'up',
details: {
spawnfile,
pid,
killed,
},
}
} catch (error) {
return { name, status: 'error', error }
}
}

module.exports = {
setupIpfs,
ipfsHealthCheack,
}
27 changes: 27 additions & 0 deletions app/keyWatcher/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,36 @@
const { createNodeApi } = require('./api')
const { setupKeyWatcher } = require('./keyWatcher')
const { ConnectionError } = require('../utils/Errors')

module.exports = {
setupKeyWatcher: async ({ onUpdate }) => {
const api = await createNodeApi()
await setupKeyWatcher(api)({ onUpdate })
return api
},
nodeHealthCheck: async (api, name = 'substrate') => {
try {
if (!(await api._isConnected)) throw new ConnectionError({ name })
const [chain, runtime] = await Promise.all([api._runtimeChain, api._runtimeVersion])

return {
name,
status: 'up',
details: {
chain,
runtime: {
name: runtime.specName,
versions: {
spec: runtime.specVersion.toNumber(),
impl: runtime.implVersion.toNumber(),
authoring: runtime.authoringVersion.toNumber(),
transaction: runtime.transactionVersion.toNumber(),
},
},
},
}
} catch (error) {
return { name, status: 'error', error }
}
},
}
54 changes: 29 additions & 25 deletions app/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,45 @@ const pinoHttp = require('pino-http')

const { PORT } = require('./env')
const logger = require('./logger')
const { setupKeyWatcher } = require('./keyWatcher')
const { setupIpfs } = require('./ipfs')
const { setupKeyWatcher, nodeHealthCheck } = require('./keyWatcher')
const { setupIpfs, ipfsHealthCheack } = require('./ipfs')
const ServiceWatcher = require('./utils/ServiceWatcher')

async function createHttpServer() {
const app = express()
const requestLogger = pinoHttp({ logger })
const ipfs = await setupIpfs()

await setupKeyWatcher({
const nodeApi = await setupKeyWatcher({
onUpdate: async (value) => {
await ipfs.stop()
await ipfs.start({ swarmKey: value })
},
})

await app.use((req, res, next) => {
// setup service watcher
// TODO add methdo foro addng service watcher so it can be done
// by calling sw.addService
const sw = new ServiceWatcher({
substrate: {
...nodeApi._api,
healthCheck: nodeHealthCheck,
},
ipfs: {
...ipfs,
healthCheck: ipfsHealthCheack,
},
})

app.use((req, res, next) => {
if (req.path !== '/health') requestLogger(req, res)
next()
})

app.get('/health', async (req, res) => {
res.status(200).send({ status: 'ok' })
const statusCode = Object.values(sw.report).some((srv) => ['down', 'error'].includes(srv.status)) ? 503 : 200

res.status(statusCode).send(sw.report)
})

// Sorry - app.use checks arity
Expand All @@ -38,35 +55,22 @@ async function createHttpServer() {
}
})

return { app, ipfs }
return { app, ipfs, sw }
}

/* istanbul ignore next */
async function startServer() {
try {
const { app, ipfs } = await createHttpServer()

const { app, ipfs, sw } = await createHttpServer()
const server = await new Promise((resolve, reject) => {
let resolved = false
const server = app.listen(PORT, (err) => {
if (err) {
if (!resolved) {
resolved = true
reject(err)
}
}
if (err) return reject(err)
logger.info(`Listening on port ${PORT} `)
if (!resolved) {
resolved = true
resolve(server)
}
})
server.on('error', (err) => {
if (!resolved) {
resolved = true
reject(err)
}
resolve(server)
sw.start()
})

server.on('error', (err) => reject(err))
})

const closeHandler = (exitCode) => async () => {
Expand Down
21 changes: 21 additions & 0 deletions app/utils/Errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class TimeoutError extends Error {
constructor(service) {
super()
this.type = this.constructor.name
this.service = service.name
this.message = 'Timeout error, no response from a service'
}
}

class ConnectionError extends Error {
constructor(service) {
super()
this.service = service.name
this.message = 'Connection is not established, will retry during next polling cycle'
}
}

module.exports = {
TimeoutError,
ConnectionError,
}
81 changes: 81 additions & 0 deletions app/utils/ServiceWatcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
const { TimeoutError } = require('./Errors')
const { HEALTHCHECK_POLL_PERIOD_MS, HEALTHCHECK_TIMEOUT_MS } = require('../env')

class ServiceWatcher {
#pollPeriod
#timeout

// TODO add a method for updating this.services
constructor(apis) {
this.report = {}
this.#pollPeriod = HEALTHCHECK_POLL_PERIOD_MS
this.#timeout = HEALTHCHECK_TIMEOUT_MS
this.services = this.#init(apis)
}

delay(ms, service = false) {
return new Promise((resolve, reject) => {
setTimeout(() => (service ? reject(new TimeoutError(service)) : resolve()), ms)
})
}

update(name, details = 'unknown') {
if (!name || typeof name !== 'string') return null // some handling
if (this.report[name] === details) return null // no need to update

this.report = {
...this.report,
[name]: details,
}
}

// organize services and store in this.services
#init(services) {
return Object.keys(services)
.map((service) => {
const { healthCheck, ...api } = services[service]
return healthCheck
? {
name: service,
poll: () => healthCheck(api, service),
}
: null
})
.filter(Boolean)
}

// fire and forget, cancel using ServiceWatcher.gen.return()
// or ServiceWatcher.gen.throw(<instance of error>)
start() {
if (this.services.length < 1) return null
this.gen = this.#generator()

const recursive = async (getAll = Promise.resolve([])) => {
try {
const services = await getAll
services.forEach(({ name, ...rest }) => this.update(name, rest))
await this.delay(this.#pollPeriod)
} catch (error) {
// if no service assume that this is server error e.g. TypeError, Parse...
const name = error.service || 'server'
this.update(name, { error, status: 'error' })
}

const { value } = this.gen.next()
recursive(value)
}

const { value } = this.gen.next()
recursive(value)
}

// a generator function that returns poll fn for each service
*#generator() {
while (true)
yield Promise.all(
this.services.map((service) => Promise.race([service.poll(), this.delay(this.#timeout, service)]))
)
}
}

module.exports = ServiceWatcher
4 changes: 2 additions & 2 deletions helm/dscp-ipfs/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
apiVersion: v2
name: dscp-ipfs
appVersion: '2.0.2'
appVersion: '2.0.3'
description: A Helm chart for dscp-ipfs
version: '2.0.2'
version: '2.0.3'
type: application
dependencies:
- name: dscp-node
Expand Down
2 changes: 1 addition & 1 deletion helm/dscp-ipfs/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ config:
image:
repository: ghcr.io/digicatapult/dscp-ipfs
pullPolicy: IfNotPresent
tag: 'v2.0.2'
tag: 'v2.0.3'

storage:
storageClass: ""
Expand Down
Loading

0 comments on commit e18f6d4

Please sign in to comment.