Skip to content

Commit

Permalink
fix(hermes): ignore broadcast send result (#1450)
Browse files Browse the repository at this point in the history
Sending over the broadcast channel only fails when there are no
receivers. We should ignore it instead of propagating it.
  • Loading branch information
ali-bahjati authored Apr 15, 2024
1 parent a607335 commit 392a3df
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
19 changes: 9 additions & 10 deletions hermes/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,23 +264,22 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
// Update the aggregate state
let mut aggregate_state = state.aggregate_state.write().await;

// Check if the update is new or out of order
match aggregate_state.latest_completed_slot {
// Send update event to subscribers. We are purposefully ignoring the result
// because there might be no subscribers.
let _ = match aggregate_state.latest_completed_slot {
None => {
aggregate_state.latest_completed_slot.replace(slot);
state.api_update_tx.send(AggregationEvent::New { slot })?;
state.api_update_tx.send(AggregationEvent::New { slot })
}
Some(latest) if slot > latest => {
state.prune_removed_keys(message_state_keys).await;
aggregate_state.latest_completed_slot.replace(slot);
state.api_update_tx.send(AggregationEvent::New { slot })?;
state.api_update_tx.send(AggregationEvent::New { slot })
}
_ => {
state
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot })?;
}
}
_ => state
.api_update_tx
.send(AggregationEvent::OutOfOrder { slot }),
};

aggregate_state.latest_completed_slot = aggregate_state
.latest_completed_slot
Expand Down
23 changes: 14 additions & 9 deletions hermes/src/network/wormhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ pub async fn process_message(state: Arc<State>, vaa_bytes: Vec<u8>) -> Result<()
)?;

// Finally, store the resulting VAA in Hermes.
store_vaa(state.clone(), vaa.sequence, vaa_bytes).await?;
let sequence = vaa.sequence;
tokio::spawn(async move {
store_vaa(state.clone(), sequence, vaa_bytes).await;
});

Ok(())
}
Expand Down Expand Up @@ -334,22 +337,24 @@ pub fn verify_vaa<'a>(
}

#[tracing::instrument(skip(state, vaa_bytes))]
pub async fn store_vaa(state: Arc<State>, sequence: u64, vaa_bytes: Vec<u8>) -> Result<()> {
pub async fn store_vaa(state: Arc<State>, sequence: u64, vaa_bytes: Vec<u8>) {
// Check VAA hasn't already been seen, this may have been checked previously
// but due to async nature It's possible other threads have mutated the state
// but due to async nature it's possible other threads have mutated the state
// since this VAA started processing.
let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await;
ensure!(
!observed_vaa_seqs.contains(&sequence),
"Previously observed VAA: {}",
sequence,
);
if observed_vaa_seqs.contains(&sequence) {
return;
}

// Clear old cached VAA sequences.
while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
observed_vaa_seqs.pop_first();
}

// Hand the VAA to the aggregate store.
crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await
if let Err(e) =
crate::aggregate::store_update(&state, crate::aggregate::Update::Vaa(vaa_bytes)).await
{
tracing::error!(error = ?e, "Failed to store VAA in aggregate store.");
}
}

0 comments on commit 392a3df

Please sign in to comment.