From 8834a29034e942edd10d478539d2316578bf7b24 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 27 Jul 2023 14:48:05 +0800 Subject: [PATCH] z Signed-off-by: CalvinNeo --- .../src/core/forward_raft/snapshot.rs | 28 ++++++++++++ proxy_tests/proxy/shared/snapshot.rs | 43 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs index a9b207b09c4..e825d0049d7 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs @@ -110,6 +110,34 @@ impl ProxyForwarder { peer_id: u64, snap_key: &store::SnapKey, snap: Option<&store::Snapshot>, + ) { + #[cfg(any(test, feature = "testexport"))] + { + #[allow(clippy::redundant_closure_call)] + let mock_duplicated_snapshot: bool = (|| { + fail::fail_point!("on_ob_pre_handle_duplicated", |t| { + let t = t.unwrap().parse::().unwrap(); + t + }); + 0 + })() != 0; + if mock_duplicated_snapshot { + // A handling snapshot may block handling later MsgAppend. + // So we fake send. + debug!("mock duplicated snapshot"); + self.pre_apply_snapshot_impl(ob_region, peer_id, snap_key, snap) + } + } + self.pre_apply_snapshot_impl(ob_region, peer_id, snap_key, snap) + } + + #[allow(clippy::single_match)] + pub fn pre_apply_snapshot_impl( + &self, + ob_region: &Region, + peer_id: u64, + snap_key: &store::SnapKey, + snap: Option<&store::Snapshot>, ) { let region_id = ob_region.get_id(); info!("pre apply snapshot"; diff --git a/proxy_tests/proxy/shared/snapshot.rs b/proxy_tests/proxy/shared/snapshot.rs index c43d1ab6513..706af9ad437 100644 --- a/proxy_tests/proxy/shared/snapshot.rs +++ b/proxy_tests/proxy/shared/snapshot.rs @@ -449,3 +449,46 @@ fn test_many_concurrent_snapshot() { cluster.shutdown(); } + +#[test] +fn test_duplicated_snapshot() { + let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2); + + disable_auto_gen_compact_log(&mut cluster); + // Disable default max peer count check. + pd_client.disable_default_operator(); + let _ = cluster.run_conf_change(); + + cluster.must_put(b"k1", b"v1"); + check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1])); + + let region = cluster.get_region(b"k1"); + let region_id = region.get_id(); + + let pending_count = cluster + .engines + .get(&2) + .unwrap() + .kv + .proxy_ext + .pending_applies_count + .clone(); + + cluster.must_put(b"k2", b"v"); + // Mock if we received the next snapshot when the first one is still handling. + fail::cfg("on_ob_pre_handle_duplicated", "return(1)").unwrap(); + pd_client.must_add_peer(region_id, new_peer(2, 2)); + + std::thread::sleep(std::time::Duration::from_millis(500)); + check_key( + &cluster, + b"k1", + b"v1", + Some(true), + Some(true), + Some(vec![2]), + ); + assert_eq!(pending_count.load(Ordering::SeqCst), 0); + fail::remove("on_ob_pre_handle_duplicated"); + cluster.shutdown(); +}