diff --git a/common/changes/@subsquid/substrate-processor/portal-api_2024-12-06-06-12.json b/common/changes/@subsquid/substrate-processor/portal-api_2024-12-06-06-12.json new file mode 100644 index 000000000..0cddc0429 --- /dev/null +++ b/common/changes/@subsquid/substrate-processor/portal-api_2024-12-06-06-12.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@subsquid/substrate-processor", + "comment": "add portal api support", + "type": "minor" + } + ], + "packageName": "@subsquid/substrate-processor" +} \ No newline at end of file diff --git a/evm/evm-processor/src/ds-archive/portal.ts b/evm/evm-processor/src/ds-archive/portal.ts index 6c0895e62..f0f216040 100644 --- a/evm/evm-processor/src/ds-archive/portal.ts +++ b/evm/evm-processor/src/ds-archive/portal.ts @@ -79,21 +79,15 @@ export class EvmPortal implements DataSource { ): AsyncIterable> { let height = new Throttler(() => this.client.getHeight(), 20_000) - let top = await height.get() + let top = await height.call() for (let req of requests) { - let from = req.range.from - let to = req.range.to - if (top < from && stopOnHead) return + let lastBlock = req.range.from - 1 + let endBlock = req.range.to || Infinity + let query = makeQuery(req) - let query = makeQuery({ - ...req, - range: {from, to}, - }) for await (let batch of this.client.stream(query, stopOnHead)) { assert(batch.length > 0, 'boundary blocks are expected to be included') - let lastBlock = last(batch).header.number - assert(lastBlock >= from) - from = lastBlock + 1 + lastBlock = last(batch).header.number let blocks = batch.map((b) => { try { @@ -108,11 +102,19 @@ export class EvmPortal implements DataSource { yield { blocks, - isHead: from > top, + isHead: lastBlock > top, } top = await height.get() } + + // stream ended before requested range, + // which means we reached the last available block + // should not happen if stopOnHead is set to false + if (lastBlock <= endBlock) { + assert(stopOnHead, 'unexpected end of stream') + break + } } } } diff --git a/evm/evm-processor/src/processor.ts b/evm/evm-processor/src/processor.ts index ab66d34be..e5b91a36b 100644 --- a/evm/evm-processor/src/processor.ts +++ b/evm/evm-processor/src/processor.ts @@ -223,7 +223,7 @@ export class EvmBatchProcessor { * processor.setGateway('https://v2.archive.subsquid.io/network/ethereum-mainnet') */ setGateway(url: string | GatewaySettings): this { - assert(this.archive?.type !== 'gateway', 'setGateway() can not be used together with setPortal()') + assert(this.archive?.type !== 'gateway', '.setGateway() can not be used together with setPortal()') this.assertNotRunning() if (typeof url == 'string') { this.archive = {type: 'gateway', url} @@ -233,9 +233,8 @@ export class EvmBatchProcessor { return this } - setPortal(url: string | PortalSettings): this { - assert(this.archive?.type !== 'gateway', 'setPortal() can not be used together with setGateway()') + assert(this.archive?.type !== 'gateway', '.setPortal() can not be used together with setGateway()') this.assertNotRunning() if (typeof url == 'string') { this.archive = {type: 'portal', url} @@ -245,7 +244,6 @@ export class EvmBatchProcessor { return this } - /** * Set chain RPC endpoint * diff --git a/substrate/substrate-processor/package.json b/substrate/substrate-processor/package.json index f8dcc9a58..d9979212a 100644 --- a/substrate/substrate-processor/package.json +++ b/substrate/substrate-processor/package.json @@ -19,6 +19,7 @@ "@subsquid/http-client": "^1.5.0", "@subsquid/logger": "^1.3.3", "@subsquid/rpc-client": "^4.9.0", + "@subsquid/portal-client": "^0.0.0", "@subsquid/substrate-data": "^4.2.1", "@subsquid/substrate-data-raw": "^1.2.0", "@subsquid/util-internal": "^3.2.0", diff --git a/substrate/substrate-processor/src/ds-archive.ts b/substrate/substrate-processor/src/ds-archive.ts index 239efe0d2..550130559 100644 --- a/substrate/substrate-processor/src/ds-archive.ts +++ b/substrate/substrate-processor/src/ds-archive.ts @@ -10,6 +10,7 @@ import {DEFAULT_FIELDS, FieldSelection} from './interfaces/data' import {ArchiveBlock, ArchiveBlockHeader} from './interfaces/data-partial' import {DataRequest} from './interfaces/data-request' import {Block, BlockHeader, Call, Event, Extrinsic, setUpItems} from './mapping' +import {mergeFields} from './selection' interface ArchiveQuery extends DataRequest { @@ -44,7 +45,6 @@ export class SubstrateArchive implements DataSource { } async *getFinalizedBlocks(requests: RangeRequestList, stopOnHead?: boolean): AsyncIterable> { - let runtimeTracker = new RuntimeTracker( this.rpc, hdr => ({height: hdr.number, hash: hdr.hash, parentHash: hdr.parentHash}), @@ -80,108 +80,8 @@ export class SubstrateArchive implements DataSource { @annotateSyncError((src: ArchiveBlock) => ({blockHeight: src.header.number, blockHash: src.header.hash})) private mapBlock(src: ArchiveBlock): Block { - let block = new Block(new BlockHeader( - assertNotNull(src.header.runtime), - assertNotNull(src.header.runtimeOfPrevBlock), - { - height: src.header.number, - ...src.header - } - )) - - if (src.extrinsics) { - for (let s of src.extrinsics) { - let extrinsic = new Extrinsic(block.header, s.index) - if (s.version != null) { - extrinsic.version = s.version - } - if (s.signature != null) { - extrinsic.signature = s.signature - } - if (s.fee != null) { - extrinsic.fee = BigInt(s.fee) - } - if (s.tip != null) { - extrinsic.tip = BigInt(s.tip) - } - if (s.error != null) { - extrinsic.error = s.error - } - if (s.success != null) { - extrinsic.success = s.success - } - if (s.hash != null) { - extrinsic.hash = s.hash - } - block.extrinsics.push(extrinsic) - } - } - - if (src.calls) { - for (let s of src.calls) { - let call = new Call(block.header, s.extrinsicIndex, s.address) - if (s.name) { - call.name = s.name - } - if (s.args != null) { - call.args = s.args - } - if (s.origin != null) { - call.origin = s.origin - } - if (s.error != null) { - call.error = s.error - } - if (s.success != null) { - call.success = s.success - } - block.calls.push(call) - } - } - - if (src.events) { - for (let s of src.events) { - let event = new Event(block.header, s.index) - if (s.name != null) { - event.name = s.name - } - if (s.args != null) { - event.args = s.args - } - if (s.phase != null) { - event.phase = s.phase - } - if (s.extrinsicIndex != null) { - event.extrinsicIndex = s.extrinsicIndex - } - if (s.callAddress != null) { - event.callAddress = s.callAddress - } - if (s.topics != null) { - event.topics = s.topics - } - block.events.push(event) - } - } - - setUpItems(block) - return block - } -} - - -type Selector = { - [K in Keys]?: boolean -} - - -function mergeFields(def: Selector, requested?: Selector, required?: Selector): Selector { - let fields: Selector = {...def} - for (let key in requested) { - fields[key] = requested[key] + return mapBlock(src) } - Object.assign(fields, required) - return fields } @@ -198,3 +98,93 @@ function getFields(fields: FieldSelection | undefined): FieldSelection { extrinsic: mergeFields(DEFAULT_FIELDS.extrinsic, fields?.extrinsic) } } + + +export function mapBlock(src: ArchiveBlock): Block { + let block = new Block(new BlockHeader( + assertNotNull(src.header.runtime), + assertNotNull(src.header.runtimeOfPrevBlock), + { + height: src.header.number, + ...src.header + } + )) + + if (src.extrinsics) { + for (let s of src.extrinsics) { + let extrinsic = new Extrinsic(block.header, s.index) + if (s.version != null) { + extrinsic.version = s.version + } + if (s.signature != null) { + extrinsic.signature = s.signature + } + if (s.fee != null) { + extrinsic.fee = BigInt(s.fee) + } + if (s.tip != null) { + extrinsic.tip = BigInt(s.tip) + } + if (s.error != null) { + extrinsic.error = s.error + } + if (s.success != null) { + extrinsic.success = s.success + } + if (s.hash != null) { + extrinsic.hash = s.hash + } + block.extrinsics.push(extrinsic) + } + } + + if (src.calls) { + for (let s of src.calls) { + let call = new Call(block.header, s.extrinsicIndex, s.address) + if (s.name) { + call.name = s.name + } + if (s.args != null) { + call.args = s.args + } + if (s.origin != null) { + call.origin = s.origin + } + if (s.error != null) { + call.error = s.error + } + if (s.success != null) { + call.success = s.success + } + block.calls.push(call) + } + } + + if (src.events) { + for (let s of src.events) { + let event = new Event(block.header, s.index) + if (s.name != null) { + event.name = s.name + } + if (s.args != null) { + event.args = s.args + } + if (s.phase != null) { + event.phase = s.phase + } + if (s.extrinsicIndex != null) { + event.extrinsicIndex = s.extrinsicIndex + } + if (s.callAddress != null) { + event.callAddress = s.callAddress + } + if (s.topics != null) { + event.topics = s.topics + } + block.events.push(event) + } + } + + setUpItems(block) + return block +} \ No newline at end of file diff --git a/substrate/substrate-processor/src/ds-portal.ts b/substrate/substrate-processor/src/ds-portal.ts new file mode 100644 index 000000000..2b93b7d79 --- /dev/null +++ b/substrate/substrate-processor/src/ds-portal.ts @@ -0,0 +1,145 @@ +import {addErrorContext, annotateSyncError, last, Throttler} from '@subsquid/util-internal' +import {Batch, DataSource} from '@subsquid/util-internal-processor-tools' +import {RangeRequest} from '@subsquid/util-internal-range' +import assert from 'assert' +import {mapBlock} from './ds-archive' +import {PortalClient} from '@subsquid/portal-client' +import {Rpc, RuntimeTracker, WithRuntime} from '@subsquid/substrate-data' +import {ArchiveBlock, ArchiveBlockHeader} from './interfaces/data-partial' +import {RpcClient} from '@subsquid/rpc-client' +import {OldSpecsBundle, OldTypesBundle} from '@subsquid/substrate-runtime' +import {DataRequest} from './interfaces/data-request' +import {Block} from './mapping' +import {assertIsValid, IsInvalid} from '@subsquid/util-internal-ingest-tools' +import {DEFAULT_FIELDS, FieldSelection} from './interfaces/data' +import {mergeFields} from './selection' + +function getFields(fields: FieldSelection | undefined): FieldSelection { + return { + block: mergeFields(DEFAULT_FIELDS.block, fields?.block, { + number: true, + hash: true, + parentHash: true, + specName: true, + specVersion: true, + implName: true, + implVersion: true, + }), + event: mergeFields(DEFAULT_FIELDS.event, fields?.event, { + index: true, + extrinsicIndex: true, + callAddress: true, + }), + call: mergeFields(DEFAULT_FIELDS.call, fields?.call, { + extrinsicIndex: true, + address: true, + }), + extrinsic: mergeFields(DEFAULT_FIELDS.extrinsic, fields?.extrinsic, { + index: true + }), + } +} + +function makeQuery(req: RangeRequest) { + let {fields, ...request} = req.request + + return { + type: 'substrate', + fromBlock: req.range.from, + toBlock: req.range.to, + fields: getFields(fields), + ...request, + } +} + +export interface SubstratePortalOptions { + client: PortalClient + rpc: RpcClient + typesBundle?: OldTypesBundle | OldSpecsBundle +} + +export class SubstratePortal implements DataSource { + private client: PortalClient + private rpc: Rpc + private typesBundle?: OldTypesBundle | OldSpecsBundle + + constructor(options: SubstratePortalOptions) { + this.client = options.client + this.rpc = new Rpc(options.rpc) + this.typesBundle = options.typesBundle + } + + getFinalizedHeight(): Promise { + return this.client.getHeight() + } + + async getBlockHash(height: number): Promise { + let query = makeQuery({ + range: {from: height, to: height}, + request: {includeAllBlocks: true}, + }) + let blocks = await this.client.query(query) + return blocks[0]?.header?.hash || null + } + + async *getFinalizedBlocks( + requests: RangeRequest[], + stopOnHead?: boolean | undefined + ): AsyncIterable> { + let height = new Throttler(() => this.client.getHeight(), 20_000) + + let runtimeTracker = new RuntimeTracker( + this.rpc, + (hdr) => ({height: hdr.number, hash: hdr.hash, parentHash: hdr.parentHash}), + (hdr) => hdr, + this.typesBundle + ) + + let top = await height.call() + for (let req of requests) { + let lastBlock = req.range.from - 1 + let endBlock = req.range.to || Infinity + let query = makeQuery(req) + + for await (let batch of this.client.stream(query, stopOnHead)) { + assert(batch.length > 0, 'boundary blocks are expected to be included') + lastBlock = last(batch).header.number + + let headers: (ArchiveBlockHeader & IsInvalid)[] = batch.map((b) => b.header) + await runtimeTracker.setRuntime(headers) + assertIsValid(headers) + + let blocks = batch.map((b) => { + try { + return this.mapBlock(b) + } catch (err: any) { + throw addErrorContext(err, { + blockHeight: b.header.number, + blockHash: b.header.hash, + }) + } + }) + + yield { + blocks, + isHead: lastBlock > top, + } + + top = await height.get() + } + + // stream ended before requested range, + // which means we reached the last available block + // should not happen if stopOnHead is set to false + if (lastBlock <= endBlock) { + assert(stopOnHead, 'unexpected end of stream') + break + } + } + } + + @annotateSyncError((src: ArchiveBlock) => ({blockHeight: src.header.number, blockHash: src.header.hash})) + private mapBlock(src: ArchiveBlock): Block { + return mapBlock(src) + } +} diff --git a/substrate/substrate-processor/src/processor.ts b/substrate/substrate-processor/src/processor.ts index 1349eb08a..c8a40fd46 100644 --- a/substrate/substrate-processor/src/processor.ts +++ b/substrate/substrate-processor/src/processor.ts @@ -32,6 +32,8 @@ import { GearUserMessageSentRequest } from './interfaces/data-request' import {getFieldSelectionValidator} from './selection' +import {SubstratePortal} from './ds-portal' +import {PortalClient} from '@subsquid/portal-client' export interface RpcEndpointSettings { @@ -94,6 +96,22 @@ export interface GatewaySettings { } +export interface PortalSettings { + /** + * Subsquid Network Gateway url + */ + url: string + /** + * Request timeout in ms + */ + requestTimeout?: number + + bufferThreshold?: number + + newBlockTimeout?: number +} + + /** * @deprecated */ @@ -160,7 +178,7 @@ export class SubstrateBatchProcessor { private fields?: FieldSelection private blockRange?: Range private finalityConfirmation?: number - private archive?: GatewaySettings + private archive?: GatewaySettings & {type: 'gateway'} | PortalSettings & {type: 'portal'} private rpcEndpoint?: RpcEndpointSettings private rpcIngestSettings?: RpcDataIngestionSettings private typesBundle?: OldTypesBundle | OldSpecsBundle @@ -184,11 +202,23 @@ export class SubstrateBatchProcessor { * processor.setGateway('https://v2.archive.subsquid.io/network/kusama') */ setGateway(url: string | GatewaySettings): this { + assert(this.archive?.type !== 'gateway', '.setGateway() can not be used together with setPortal()') this.assertNotRunning() if (typeof url == 'string') { - this.archive = {url} + this.archive = {type: 'gateway', url} } else { - this.archive = url + this.archive = {type: 'gateway', ...url} + } + return this + } + + setPortal(url: string | PortalSettings): this { + assert(this.archive?.type !== 'gateway', '.setPortal() can not be used together with setGateway()') + this.assertNotRunning() + if (typeof url == 'string') { + this.archive = {type: 'portal', url} + } else { + this.archive = {type: 'portal', ...url} } return this } @@ -475,7 +505,7 @@ export class SubstrateBatchProcessor { } @def - private getArchiveDataSource(): SubstrateArchive { + private getArchiveDataSource(): SubstrateArchive | SubstratePortal { let options = assertNotNull(this.archive) let log = this.getLogger().child('archive') @@ -490,16 +520,29 @@ export class SubstrateBatchProcessor { log }) - return new SubstrateArchive({ - client: new ArchiveClient({ - http, - url: options.url, - queryTimeout: options.requestTimeout, - log - }), - rpc: this.getChainRpcClient(), - typesBundle: this.typesBundle - }) + return options.type === 'gateway' + ? new SubstrateArchive({ + client: new ArchiveClient({ + http, + url: options.url, + queryTimeout: options.requestTimeout, + log + }), + rpc: this.getChainRpcClient(), + typesBundle: this.typesBundle + }) + : new SubstratePortal({ + client: new PortalClient({ + http, + url: options.url, + queryTimeout: options.requestTimeout, + bufferThreshold: options.bufferThreshold, + newBlockTimeout: options.newBlockTimeout, + log, + }), + rpc: this.getChainRpcClient(), + typesBundle: this.typesBundle + }) } @def diff --git a/substrate/substrate-processor/src/selection.ts b/substrate/substrate-processor/src/selection.ts index cc76b6286..311505fcc 100644 --- a/substrate/substrate-processor/src/selection.ts +++ b/substrate/substrate-processor/src/selection.ts @@ -64,4 +64,19 @@ export function getFieldSelectionValidator() { call: option(getCallSelectionValidator()), event: option(getEventSelectionValidator()), }) +} + + +type Selector = { + [K in Keys]?: boolean +} + + +export function mergeFields(def: Selector, requested?: Selector, required?: Selector): Selector { + let fields: Selector = {...def} + for (let key in requested) { + fields[key] = requested[key] + } + Object.assign(fields, required) + return fields } \ No newline at end of file diff --git a/test/balances/src/processor.ts b/test/balances/src/processor.ts index e8ddb7c17..90d258e9e 100644 --- a/test/balances/src/processor.ts +++ b/test/balances/src/processor.ts @@ -8,14 +8,17 @@ import {events} from './types' const processor = new SubstrateBatchProcessor() - .setGateway('https://v2.archive.subsquid.io/network/kusama') + .setPortal('https://portal.sqd.dev/datasets/kusama') .setRpcEndpoint(process.env.KUSAMA_NODE_WS || 'wss://kusama-rpc.polkadot.io') + .setRpcDataIngestionSettings({ + disabled: true, + }) .setFields({ block: { timestamp: true } }) - .setBlockRange({from: 19_666_100}) + .setBlockRange({from: 0}) .addEvent({ name: [events.balances.transfer.name] })