Skip to content

Commit

Permalink
Improve starting cursor handling (#402)
Browse files Browse the repository at this point in the history
### Summary

This PR improves on the starting cursor handling by:

- be less strict when comparing cursors by ignoring any 0 prefix in the
hash.
 - return the hash of the canonical block on error.
 - signal to user whether the data is a backfill or live block.
  • Loading branch information
fracek authored Jan 18, 2025
2 parents 09d56e5 + aa3d7ab commit b480d68
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 22 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion beaconchain/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "apibara-dna-beaconchain"
version = "2.0.0-beta.11"
version = "2.0.0-beta.12"
edition.workspace = true
authors.workspace = true
repository.workspace = true
Expand Down
32 changes: 32 additions & 0 deletions common/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,38 @@ impl CanonicalChainSegment {
Ok(cursor)
}

pub fn siblings(&self, cursor: &Cursor) -> Result<Vec<Cursor>, CanonicalChainError> {
if cursor.number < self.info.first_block.number {
return Err(CanonicalChainError::View)
.attach_printable("cursor is before the first block")
.attach_printable_lazy(|| format!("cursor: {cursor:?}"))
.attach_printable_lazy(|| format!("first block: {:?}", self.info.first_block));
}

if cursor.number > self.info.last_block.number {
// The block could have been reorged while the chain shrunk.
let Some(reorgs) = self
.extra_reorgs
.iter()
.find(|r| r.block_number == cursor.number)
else {
return Err(CanonicalChainError::View)
.attach_printable("cursor is after the last block")
.attach_printable_lazy(|| format!("cursor: {cursor:?}"))
.attach_printable_lazy(|| format!("last block: {:?}", self.info.last_block));
};

let siblings = reorgs.reorgs.values().cloned().collect::<Vec<_>>();
return Ok(siblings);
}

let offset = cursor.number - self.info.first_block.number;

let canonical = &self.canonical[offset as usize];
let siblings = canonical.reorgs.values().cloned().collect::<Vec<_>>();
Ok(siblings)
}

pub fn reconnect(&self, cursor: &Cursor) -> Result<ReconnectAction, CanonicalChainError> {
if cursor.number < self.info.first_block.number {
return Err(CanonicalChainError::View)
Expand Down
45 changes: 42 additions & 3 deletions common/src/chain_view/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@ pub enum CanonicalCursor {
Canonical(Cursor),
}

/// Result of validating a cursor.
#[derive(Debug, Clone)]
pub enum ValidatedCursor {
/// The cursor is valid. The cursor returned is normalized.
Valid(Cursor),
/// The cursor is invalid. The cursors returned are the canonical cursor and its siblings.
Invalid(Cursor, Vec<Cursor>),
}

#[derive(Debug, Clone)]
pub enum NextCursor {
/// Continue streaming from the given cursor.
Continue(Cursor),
Continue { cursor: Cursor, is_head: bool },
/// Reorg to the given cursor.
Invalidate(Cursor),
/// Nothing to do.
Expand Down Expand Up @@ -58,7 +67,10 @@ impl FullCanonicalChain {
) -> Result<NextCursor, ChainViewError> {
let Some(cursor) = cursor else {
let first_available = self.get_canonical_impl(self.starting_block).await?;
return Ok(NextCursor::Continue(first_available));
return Ok(NextCursor::Continue {
cursor: first_available,
is_head: false,
});
};

let segment = self.get_chain_segment(cursor.number).await?;
Expand All @@ -69,13 +81,40 @@ impl FullCanonicalChain {
return Ok(NextCursor::AtHead);
}
let next_available = self.get_canonical_impl(cursor.number + 1).await?;
Ok(NextCursor::Continue(next_available))
Ok(NextCursor::Continue {
is_head: next_available.number == self.recent.info.last_block.number,
cursor: next_available,
})
}
ReconnectAction::OfflineReorg(target) => Ok(NextCursor::Invalidate(target)),
ReconnectAction::Unknown => Err(ChainViewError).attach_printable("unknown cursor"),
}
}

pub async fn validate_cursor(
&self,
cursor: &Cursor,
) -> Result<ValidatedCursor, ChainViewError> {
let segment = self.get_chain_segment(cursor.number).await?;
let canonical = segment
.canonical(cursor.number)
.change_context(ChainViewError)?;

if canonical.is_equivalent(cursor) {
return Ok(ValidatedCursor::Valid(canonical.clone()));
}

// Iterate over the siblings because the user may have provided a malformed cursor that was reorged.
let siblings = segment.siblings(cursor).change_context(ChainViewError)?;
for sibling in siblings.iter() {
if sibling.is_equivalent(cursor) {
return Ok(ValidatedCursor::Valid(sibling.clone()));
}
}

Ok(ValidatedCursor::Invalid(canonical, siblings))
}

pub async fn get_head(&self) -> Result<Cursor, ChainViewError> {
Ok(self.recent.info.last_block.clone())
}
Expand Down
2 changes: 1 addition & 1 deletion common/src/chain_view/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ mod sync;
mod view;

pub use self::error::ChainViewError;
pub use self::full::{CanonicalCursor, NextCursor};
pub use self::full::{CanonicalCursor, NextCursor, ValidatedCursor};
pub use self::sync::{chain_view_sync_loop, ChainViewSyncService};
pub use self::view::ChainView;
10 changes: 9 additions & 1 deletion common/src/chain_view/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::Cursor;

use super::{
error::ChainViewError,
full::{FullCanonicalChain, NextCursor},
full::{FullCanonicalChain, NextCursor, ValidatedCursor},
CanonicalCursor,
};

Expand Down Expand Up @@ -106,6 +106,14 @@ impl ChainView {
inner.canonical.get_next_cursor(cursor).await
}

pub async fn validate_cursor(
&self,
cursor: &Cursor,
) -> Result<ValidatedCursor, ChainViewError> {
let inner = self.0.read().await;
inner.canonical.validate_cursor(cursor).await
}

pub async fn get_canonical(
&self,
block_number: u64,
Expand Down
2 changes: 1 addition & 1 deletion common/src/compaction/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl SegmentGroupService {
.await
.change_context(CompactionError)?
{
let NextCursor::Continue(cursor) = chain_view
let NextCursor::Continue { cursor, .. } = chain_view
.get_next_cursor(&Some(cursor.clone()))
.await
.change_context(CompactionError)?
Expand Down
7 changes: 5 additions & 2 deletions common/src/compaction/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl SegmentService {
.await
.change_context(CompactionError)?
{
let NextCursor::Continue(cursor) = chain_view
let NextCursor::Continue { cursor, .. } = chain_view
.get_next_cursor(&Some(cursor.clone()))
.await
.change_context(CompactionError)?
Expand Down Expand Up @@ -115,7 +115,10 @@ impl SegmentService {
.attach_printable("failed to add block to segment")
.attach_printable_lazy(|| format!("cursor: {current}"))?;

let NextCursor::Continue(next_cursor) = chain_view
let NextCursor::Continue {
cursor: next_cursor,
..
} = chain_view
.get_next_cursor(&Some(current.clone()))
.await
.change_context(CompactionError)?
Expand Down
97 changes: 97 additions & 0 deletions common/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ use rkyv::{Archive, Deserialize, Serialize};
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Archive, Serialize, Deserialize, Default)]
pub struct Hash(pub Vec<u8>);

impl Hash {
pub fn len(&self) -> usize {
self.0.len()
}

pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

/// Cursor uniquely identifies a block by its number and hash.
#[derive(Clone, PartialEq, Eq, Hash, Archive, Serialize, Deserialize)]
pub struct Cursor {
Expand Down Expand Up @@ -47,6 +57,34 @@ impl Cursor {
pub fn hash_as_hex(&self) -> String {
format!("{}", self.hash)
}

pub fn is_equivalent(&self, other: &Self) -> bool {
if self.number != other.number {
return false;
}

if self.hash.is_empty() || other.hash.is_empty() {
return true;
}

if self.hash.len() == other.hash.len() {
return self.hash == other.hash;
}

// Check that the two hashes end with the same bytes and that the longer hash extra bytes are all zero.
let min_len = std::cmp::min(self.hash.len(), other.hash.len());

let prefix = &self.hash.0[self.hash.len() - min_len..];
let other_prefix = &other.hash.0[other.hash.len() - min_len..];

if self.hash.len() > other.hash.len() {
let len_diff = self.hash.len() - other.hash.len();
return other_prefix == prefix && self.hash.0[0..len_diff] == vec![0; len_diff];
}

let len_diff = other.hash.len() - self.hash.len();
prefix == other_prefix && other.hash.0[0..len_diff] == vec![0; len_diff]
}
}

impl Hash {
Expand Down Expand Up @@ -124,3 +162,62 @@ pub mod testing {
}
}
}

#[cfg(test)]
mod tests {
use crate::Hash;

use super::Cursor;

#[test]
fn test_cursor_is_equivalent() {
{
let cursor = Cursor::new(1, Hash::default());
let other = Cursor::new(1, Hash([0, 0, 1].to_vec()));
assert!(cursor.is_equivalent(&other));
assert!(other.is_equivalent(&cursor));
}

{
let cursor = Cursor::new(1, Hash::default());
let other = Cursor::new(1, Hash::default());
assert!(cursor.is_equivalent(&other));
assert!(other.is_equivalent(&cursor));
}

{
let cursor = Cursor::new(1, Hash::default());
let other = Cursor::new(2, Hash::default());
assert!(!cursor.is_equivalent(&other));
assert!(!other.is_equivalent(&cursor));
}

{
let cursor = Cursor::new(1, Hash([0, 0, 1].to_vec()));
let other = Cursor::new(1, Hash([0, 1].to_vec()));
assert!(cursor.is_equivalent(&other));
assert!(other.is_equivalent(&cursor));
}

{
let cursor = Cursor::new(1, Hash([0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4].to_vec()));
let other = Cursor::new(1, Hash([1, 2, 3, 4].to_vec()));
assert!(cursor.is_equivalent(&other));
assert!(other.is_equivalent(&cursor));
}

{
let cursor = Cursor::new(1, Hash([0, 0, 1].to_vec()));
let other = Cursor::new(1, Hash([0, 0, 2].to_vec()));
assert!(!cursor.is_equivalent(&other));
assert!(!other.is_equivalent(&cursor));
}

{
let cursor = Cursor::new(1, Hash([0, 0, 1].to_vec()));
let other = Cursor::new(1, Hash([0, 0, 1, 1].to_vec()));
assert!(!cursor.is_equivalent(&other));
assert!(!other.is_equivalent(&cursor));
}
}
}
Loading

0 comments on commit b480d68

Please sign in to comment.