diff --git a/Cargo.lock b/Cargo.lock index 39e873e4..5fc29dab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -559,7 +559,7 @@ dependencies = [ [[package]] name = "apibara-dna-beaconchain" -version = "2.0.0-beta.11" +version = "2.0.0-beta.12" dependencies = [ "alloy-consensus", "alloy-eips", @@ -641,7 +641,7 @@ dependencies = [ [[package]] name = "apibara-dna-evm" -version = "2.0.0-beta.11" +version = "2.0.0-beta.12" dependencies = [ "alloy-primitives", "alloy-provider", @@ -691,7 +691,7 @@ dependencies = [ [[package]] name = "apibara-dna-starknet" -version = "2.0.0-beta.11" +version = "2.0.0-beta.12" dependencies = [ "apibara-dna-common", "apibara-dna-protocol", diff --git a/beaconchain/Cargo.toml b/beaconchain/Cargo.toml index 5609df40..df126e1a 100644 --- a/beaconchain/Cargo.toml +++ b/beaconchain/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apibara-dna-beaconchain" -version = "2.0.0-beta.11" +version = "2.0.0-beta.12" edition.workspace = true authors.workspace = true repository.workspace = true diff --git a/common/src/chain.rs b/common/src/chain.rs index eee6531f..01835a0e 100644 --- a/common/src/chain.rs +++ b/common/src/chain.rs @@ -419,6 +419,38 @@ impl CanonicalChainSegment { Ok(cursor) } + pub fn siblings(&self, cursor: &Cursor) -> Result, CanonicalChainError> { + if cursor.number < self.info.first_block.number { + return Err(CanonicalChainError::View) + .attach_printable("cursor is before the first block") + .attach_printable_lazy(|| format!("cursor: {cursor:?}")) + .attach_printable_lazy(|| format!("first block: {:?}", self.info.first_block)); + } + + if cursor.number > self.info.last_block.number { + // The block could have been reorged while the chain shrunk. + let Some(reorgs) = self + .extra_reorgs + .iter() + .find(|r| r.block_number == cursor.number) + else { + return Err(CanonicalChainError::View) + .attach_printable("cursor is after the last block") + .attach_printable_lazy(|| format!("cursor: {cursor:?}")) + .attach_printable_lazy(|| format!("last block: {:?}", self.info.last_block)); + }; + + let siblings = reorgs.reorgs.values().cloned().collect::>(); + return Ok(siblings); + } + + let offset = cursor.number - self.info.first_block.number; + + let canonical = &self.canonical[offset as usize]; + let siblings = canonical.reorgs.values().cloned().collect::>(); + Ok(siblings) + } + pub fn reconnect(&self, cursor: &Cursor) -> Result { if cursor.number < self.info.first_block.number { return Err(CanonicalChainError::View) diff --git a/common/src/chain_view/full.rs b/common/src/chain_view/full.rs index d0f1ff6c..0fa509b8 100644 --- a/common/src/chain_view/full.rs +++ b/common/src/chain_view/full.rs @@ -13,10 +13,19 @@ pub enum CanonicalCursor { Canonical(Cursor), } +/// Result of validating a cursor. +#[derive(Debug, Clone)] +pub enum ValidatedCursor { + /// The cursor is valid. The cursor returned is normalized. + Valid(Cursor), + /// The cursor is invalid. The cursors returned are the canonical cursor and its siblings. + Invalid(Cursor, Vec), +} + #[derive(Debug, Clone)] pub enum NextCursor { /// Continue streaming from the given cursor. - Continue(Cursor), + Continue { cursor: Cursor, is_head: bool }, /// Reorg to the given cursor. Invalidate(Cursor), /// Nothing to do. @@ -58,7 +67,10 @@ impl FullCanonicalChain { ) -> Result { let Some(cursor) = cursor else { let first_available = self.get_canonical_impl(self.starting_block).await?; - return Ok(NextCursor::Continue(first_available)); + return Ok(NextCursor::Continue { + cursor: first_available, + is_head: false, + }); }; let segment = self.get_chain_segment(cursor.number).await?; @@ -69,13 +81,40 @@ impl FullCanonicalChain { return Ok(NextCursor::AtHead); } let next_available = self.get_canonical_impl(cursor.number + 1).await?; - Ok(NextCursor::Continue(next_available)) + Ok(NextCursor::Continue { + is_head: next_available.number == self.recent.info.last_block.number, + cursor: next_available, + }) } ReconnectAction::OfflineReorg(target) => Ok(NextCursor::Invalidate(target)), ReconnectAction::Unknown => Err(ChainViewError).attach_printable("unknown cursor"), } } + pub async fn validate_cursor( + &self, + cursor: &Cursor, + ) -> Result { + let segment = self.get_chain_segment(cursor.number).await?; + let canonical = segment + .canonical(cursor.number) + .change_context(ChainViewError)?; + + if canonical.is_equivalent(cursor) { + return Ok(ValidatedCursor::Valid(canonical.clone())); + } + + // Iterate over the siblings because the user may have provided a malformed cursor that was reorged. + let siblings = segment.siblings(cursor).change_context(ChainViewError)?; + for sibling in siblings.iter() { + if sibling.is_equivalent(cursor) { + return Ok(ValidatedCursor::Valid(sibling.clone())); + } + } + + Ok(ValidatedCursor::Invalid(canonical, siblings)) + } + pub async fn get_head(&self) -> Result { Ok(self.recent.info.last_block.clone()) } diff --git a/common/src/chain_view/mod.rs b/common/src/chain_view/mod.rs index 8a8e4b33..4307fe7c 100644 --- a/common/src/chain_view/mod.rs +++ b/common/src/chain_view/mod.rs @@ -4,6 +4,6 @@ mod sync; mod view; pub use self::error::ChainViewError; -pub use self::full::{CanonicalCursor, NextCursor}; +pub use self::full::{CanonicalCursor, NextCursor, ValidatedCursor}; pub use self::sync::{chain_view_sync_loop, ChainViewSyncService}; pub use self::view::ChainView; diff --git a/common/src/chain_view/view.rs b/common/src/chain_view/view.rs index 9b0e405d..d0ace009 100644 --- a/common/src/chain_view/view.rs +++ b/common/src/chain_view/view.rs @@ -7,7 +7,7 @@ use crate::Cursor; use super::{ error::ChainViewError, - full::{FullCanonicalChain, NextCursor}, + full::{FullCanonicalChain, NextCursor, ValidatedCursor}, CanonicalCursor, }; @@ -106,6 +106,14 @@ impl ChainView { inner.canonical.get_next_cursor(cursor).await } + pub async fn validate_cursor( + &self, + cursor: &Cursor, + ) -> Result { + let inner = self.0.read().await; + inner.canonical.validate_cursor(cursor).await + } + pub async fn get_canonical( &self, block_number: u64, diff --git a/common/src/compaction/group.rs b/common/src/compaction/group.rs index d657360b..71a515f5 100644 --- a/common/src/compaction/group.rs +++ b/common/src/compaction/group.rs @@ -60,7 +60,7 @@ impl SegmentGroupService { .await .change_context(CompactionError)? { - let NextCursor::Continue(cursor) = chain_view + let NextCursor::Continue { cursor, .. } = chain_view .get_next_cursor(&Some(cursor.clone())) .await .change_context(CompactionError)? diff --git a/common/src/compaction/segment.rs b/common/src/compaction/segment.rs index dd83fa18..b52977f1 100644 --- a/common/src/compaction/segment.rs +++ b/common/src/compaction/segment.rs @@ -51,7 +51,7 @@ impl SegmentService { .await .change_context(CompactionError)? { - let NextCursor::Continue(cursor) = chain_view + let NextCursor::Continue { cursor, .. } = chain_view .get_next_cursor(&Some(cursor.clone())) .await .change_context(CompactionError)? @@ -115,7 +115,10 @@ impl SegmentService { .attach_printable("failed to add block to segment") .attach_printable_lazy(|| format!("cursor: {current}"))?; - let NextCursor::Continue(next_cursor) = chain_view + let NextCursor::Continue { + cursor: next_cursor, + .. + } = chain_view .get_next_cursor(&Some(current.clone())) .await .change_context(CompactionError)? diff --git a/common/src/core.rs b/common/src/core.rs index f5a12312..dc55fb67 100644 --- a/common/src/core.rs +++ b/common/src/core.rs @@ -5,6 +5,16 @@ use rkyv::{Archive, Deserialize, Serialize}; #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Archive, Serialize, Deserialize, Default)] pub struct Hash(pub Vec); +impl Hash { + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + /// Cursor uniquely identifies a block by its number and hash. #[derive(Clone, PartialEq, Eq, Hash, Archive, Serialize, Deserialize)] pub struct Cursor { @@ -47,6 +57,34 @@ impl Cursor { pub fn hash_as_hex(&self) -> String { format!("{}", self.hash) } + + pub fn is_equivalent(&self, other: &Self) -> bool { + if self.number != other.number { + return false; + } + + if self.hash.is_empty() || other.hash.is_empty() { + return true; + } + + if self.hash.len() == other.hash.len() { + return self.hash == other.hash; + } + + // Check that the two hashes end with the same bytes and that the longer hash extra bytes are all zero. + let min_len = std::cmp::min(self.hash.len(), other.hash.len()); + + let prefix = &self.hash.0[self.hash.len() - min_len..]; + let other_prefix = &other.hash.0[other.hash.len() - min_len..]; + + if self.hash.len() > other.hash.len() { + let len_diff = self.hash.len() - other.hash.len(); + return other_prefix == prefix && self.hash.0[0..len_diff] == vec![0; len_diff]; + } + + let len_diff = other.hash.len() - self.hash.len(); + prefix == other_prefix && other.hash.0[0..len_diff] == vec![0; len_diff] + } } impl Hash { @@ -124,3 +162,62 @@ pub mod testing { } } } + +#[cfg(test)] +mod tests { + use crate::Hash; + + use super::Cursor; + + #[test] + fn test_cursor_is_equivalent() { + { + let cursor = Cursor::new(1, Hash::default()); + let other = Cursor::new(1, Hash([0, 0, 1].to_vec())); + assert!(cursor.is_equivalent(&other)); + assert!(other.is_equivalent(&cursor)); + } + + { + let cursor = Cursor::new(1, Hash::default()); + let other = Cursor::new(1, Hash::default()); + assert!(cursor.is_equivalent(&other)); + assert!(other.is_equivalent(&cursor)); + } + + { + let cursor = Cursor::new(1, Hash::default()); + let other = Cursor::new(2, Hash::default()); + assert!(!cursor.is_equivalent(&other)); + assert!(!other.is_equivalent(&cursor)); + } + + { + let cursor = Cursor::new(1, Hash([0, 0, 1].to_vec())); + let other = Cursor::new(1, Hash([0, 1].to_vec())); + assert!(cursor.is_equivalent(&other)); + assert!(other.is_equivalent(&cursor)); + } + + { + let cursor = Cursor::new(1, Hash([0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4].to_vec())); + let other = Cursor::new(1, Hash([1, 2, 3, 4].to_vec())); + assert!(cursor.is_equivalent(&other)); + assert!(other.is_equivalent(&cursor)); + } + + { + let cursor = Cursor::new(1, Hash([0, 0, 1].to_vec())); + let other = Cursor::new(1, Hash([0, 0, 2].to_vec())); + assert!(!cursor.is_equivalent(&other)); + assert!(!other.is_equivalent(&cursor)); + } + + { + let cursor = Cursor::new(1, Hash([0, 0, 1].to_vec())); + let other = Cursor::new(1, Hash([0, 0, 1, 1].to_vec())); + assert!(!cursor.is_equivalent(&other)); + assert!(!other.is_equivalent(&cursor)); + } + } +} diff --git a/common/src/data_stream/stream.rs b/common/src/data_stream/stream.rs index e2bc723b..9a007418 100644 --- a/common/src/data_stream/stream.rs +++ b/common/src/data_stream/stream.rs @@ -4,8 +4,8 @@ use std::{ }; use apibara_dna_protocol::dna::stream::{ - stream_data_response::Message, Data, DataFinality, Finalize, Heartbeat, Invalidate, - StreamDataResponse, + stream_data_response::Message, Data, DataFinality, DataProduction, Finalize, Heartbeat, + Invalidate, StreamDataResponse, }; use bytes::{BufMut, Bytes, BytesMut}; use error_stack::{Result, ResultExt}; @@ -107,13 +107,13 @@ impl DataStream { tx: &mpsc::Sender, ct: &CancellationToken, ) -> Result<(), DataStreamError> { - let next_cursor = match self + let (next_cursor, is_head) = match self .chain_view .get_next_cursor(&self.current) .await .change_context(DataStreamError)? { - NextCursor::Continue(cursor) => cursor, + NextCursor::Continue { cursor, is_head } => (cursor, is_head), NextCursor::Invalidate(cursor) => { debug!(cursor = %cursor, "invalidating data"); @@ -230,7 +230,7 @@ impl DataStream { return self.tick_segment(next_cursor, tx, ct).await; } - self.tick_single(next_cursor, tx, ct).await + self.tick_single(next_cursor, is_head, tx, ct).await } async fn send_heartbeat_message( @@ -444,6 +444,7 @@ impl DataStream { end_cursor: proto_end_cursor.clone(), data: blocks, finality: finality as i32, + production: DataProduction::Backfill.into(), }); let Some(Ok(permit)) = ct.run_until_cancelled(tx.reserve()).await else { @@ -551,6 +552,7 @@ impl DataStream { end_cursor: proto_end_cursor.clone(), data: blocks, finality: finality as i32, + production: DataProduction::Backfill.into(), }); let Some(Ok(permit)) = ct.run_until_cancelled(tx.reserve()).await else { @@ -572,6 +574,7 @@ impl DataStream { async fn tick_single( &mut self, cursor: Cursor, + is_head: bool, tx: &mpsc::Sender, ct: &CancellationToken, ) -> Result<(), DataStreamError> { @@ -599,6 +602,7 @@ impl DataStream { let fragment_access = FragmentAccess::new_in_block(self.store.clone(), cursor.clone()); let mut blocks = Vec::new(); + if self .filter_fragment(&fragment_access, &finality, &mut blocks) .await? @@ -608,6 +612,11 @@ impl DataStream { end_cursor: proto_end_cursor.clone(), data: blocks, finality: finality.into(), + production: if is_head { + DataProduction::Live.into() + } else { + DataProduction::Backfill.into() + }, }); let Some(Ok(permit)) = ct.run_until_cancelled(tx.reserve()).await else { @@ -671,6 +680,7 @@ impl DataStream { end_cursor: proto_end_cursor.clone(), data: blocks, finality: finality.into(), + production: DataProduction::Live.into(), }); let Some(Ok(permit)) = ct.run_until_cancelled(tx.reserve()).await else { diff --git a/common/src/server/service.rs b/common/src/server/service.rs index ed78f16e..fd964643 100644 --- a/common/src/server/service.rs +++ b/common/src/server/service.rs @@ -9,11 +9,11 @@ use futures::{Future, Stream, StreamExt, TryFutureExt}; use tokio::sync::{mpsc, Semaphore}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{debug, error, info}; use crate::{ block_store::BlockStoreReader, - chain_view::{CanonicalCursor, ChainView, ChainViewError}, + chain_view::{CanonicalCursor, ChainView, ChainViewError, ValidatedCursor}, data_stream::{BlockFilterFactory, DataStream}, fragment::FragmentId, Cursor, @@ -143,8 +143,29 @@ where // The block could be reorged but that's handled by the `DataStream`. let starting_cursor = if let Some(cursor) = request.starting_cursor { let cursor = Cursor::from(cursor); + debug!(cursor = %cursor, "starting cursor before validation"); chain_view.ensure_cursor_in_range(&cursor).await?; - cursor.into() + match chain_view.validate_cursor(&cursor).await { + Ok(ValidatedCursor::Valid(cursor)) => Some(cursor), + Ok(ValidatedCursor::Invalid(canonical, siblings)) => { + let sibling_hashes = if siblings.is_empty() { + "none".to_string() + } else { + siblings + .iter() + .map(|c| c.hash_as_hex()) + .collect::>() + .join(", ") + }; + return Err(tonic::Status::invalid_argument(format!( + "starting cursor {cursor} not found. canonical: {}, reorged: {sibling_hashes}", + canonical.hash_as_hex() + ))); + } + Err(_) => { + return Err(tonic::Status::internal("internal server error")); + } + } } else { None }; diff --git a/evm/Cargo.toml b/evm/Cargo.toml index 6b73ac90..a8084c80 100644 --- a/evm/Cargo.toml +++ b/evm/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apibara-dna-evm" -version = "2.0.0-beta.11" +version = "2.0.0-beta.12" edition.workspace = true authors.workspace = true repository.workspace = true diff --git a/protocol/proto/dna/v2/stream.proto b/protocol/proto/dna/v2/stream.proto index adb38907..f98e06a3 100644 --- a/protocol/proto/dna/v2/stream.proto +++ b/protocol/proto/dna/v2/stream.proto @@ -106,6 +106,8 @@ message Data { // // This message contains chain-specific data serialized using protobuf. repeated bytes data = 4; + // The production mode of the block. + DataProduction production = 5; } // Sent to clients to check if stream is still connected. @@ -131,3 +133,12 @@ enum DataFinality { // Data is finalized and cannot be invalidated. DATA_FINALITY_FINALIZED = 3; } + +// Data production mode. +enum DataProduction { + DATA_PRODUCTION_UNKNOWN = 0; + // Data is for a backfilled block. + DATA_PRODUCTION_BACKFILL = 1; + // Data is for a live block. + DATA_PRODUCTION_LIVE = 2; +} diff --git a/starknet/Cargo.toml b/starknet/Cargo.toml index daeb994e..d4b55d7f 100644 --- a/starknet/Cargo.toml +++ b/starknet/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "apibara-dna-starknet" -version = "2.0.0-beta.11" +version = "2.0.0-beta.12" edition.workspace = true authors.workspace = true repository.workspace = true