From 4f8cfa9a4d25f5110877edf163eb7ef05cd19d0e Mon Sep 17 00:00:00 2001 From: getsentry-bot Date: Fri, 15 Nov 2024 22:12:07 +0000 Subject: [PATCH] Revert "feat(consumers): rust consumers quantized rebalance (#6561)" This reverts commit f5116c104a6dda0709027766c2faca7e85b94d89. Co-authored-by: volokluev <3169433+volokluev@users.noreply.github.com> --- rust_snuba/Cargo.lock | 40 ---------------- rust_snuba/Cargo.toml | 1 - rust_snuba/src/consumer.rs | 25 ++-------- rust_snuba/src/lib.rs | 1 - rust_snuba/src/rebalancing.rs | 81 -------------------------------- rust_snuba/src/runtime_config.rs | 59 ----------------------- 6 files changed, 4 insertions(+), 203 deletions(-) delete mode 100644 rust_snuba/src/rebalancing.rs diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 0a72bd7729..5d82f30ef7 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -124,12 +124,6 @@ dependencies = [ "backtrace", ] -[[package]] -name = "arc-swap" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" - [[package]] name = "ascii-canvas" version = "3.0.0" @@ -641,16 +635,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "combine" -version = "4.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" -dependencies = [ - "bytes", - "memchr", -] - [[package]] name = "concurrent-queue" version = "2.4.0" @@ -2728,23 +2712,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "redis" -version = "0.27.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81cccf17a692ce51b86564334614d72dcae1def0fd5ecebc9f02956da74352b5" -dependencies = [ - "arc-swap", - "combine", - "itoa", - "num-bigint", - "percent-encoding", - "ryu", - "sha1_smol", - "socket2 0.5.6", - "url", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -2902,7 +2869,6 @@ dependencies = [ "parking_lot", "procspawn", "pyo3", - "redis", "reqwest", "rust_arroyo", "schemars", @@ -3358,12 +3324,6 @@ dependencies = [ "unsafe-libyaml", ] -[[package]] -name = "sha1_smol" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" - [[package]] name = "sha2" version = "0.10.8" diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 62d7a1507a..210a81c3c4 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -54,7 +54,6 @@ data-encoding = "2.5.0" zstd = "0.12.3" serde_with = "3.8.1" seq-macro = "0.3" -redis = "0.27.5" [patch.crates-io] diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index bc5e51aa59..27f3ff429e 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -23,7 +23,6 @@ use crate::metrics::global_tags::set_global_tag; use crate::metrics::statsd::StatsDBackend; use crate::mutations::factory::MutConsumerStrategyFactory; use crate::processors; -use crate::rebalancing; use crate::types::{InsertOrReplacement, KafkaMessageMetadata}; #[pyfunction] @@ -235,11 +234,6 @@ pub fn consumer_impl( let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name); - let rebalance_delay_secs = rebalancing::get_rebalance_delay_secs(consumer_group); - if let Some(secs) = rebalance_delay_secs { - rebalancing::delay_kafka_rebalance(secs) - } - let processor = if mutations_mode { let mut_factory = MutConsumerStrategyFactory { storage_config: first_storage, @@ -292,21 +286,10 @@ pub fn consumer_impl( let mut handle = processor.get_handle(); - match rebalance_delay_secs { - Some(secs) => { - ctrlc::set_handler(move || { - rebalancing::delay_kafka_rebalance(secs); - handle.signal_shutdown(); - }) - .expect("Error setting Ctrl-C handler"); - } - None => { - ctrlc::set_handler(move || { - handle.signal_shutdown(); - }) - .expect("Error setting Ctrl-C handler"); - } - } + ctrlc::set_handler(move || { + handle.signal_shutdown(); + }) + .expect("Error setting Ctrl-C handler"); if let Err(error) = processor.run() { let error: &dyn std::error::Error = &error; diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index 9c74a3f738..35a97c3404 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -6,7 +6,6 @@ mod logging; mod metrics; mod mutations; mod processors; -mod rebalancing; mod runtime_config; mod strategies; mod types; diff --git a/rust_snuba/src/rebalancing.rs b/rust_snuba/src/rebalancing.rs deleted file mode 100644 index b57a0aeaac..0000000000 --- a/rust_snuba/src/rebalancing.rs +++ /dev/null @@ -1,81 +0,0 @@ -use crate::runtime_config; -use std::thread; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -pub fn delay_kafka_rebalance(configured_delay_secs: u64) { - /* - * Introduces a configurable delay to the consumer topic - * subscription and consumer shutdown steps (handled by the - * StreamProcessor). The idea behind is that by forcing - * these steps to occur at certain time "ticks" (for example, at - * every 15 second tick in a minute), we can reduce the number of - * rebalances that are triggered during a deploy. This means - * fewer "stop the world rebalancing" occurrences and more time - * for the consumer group to stabilize and make progress. - */ - let current_time = SystemTime::now(); - let time_elapsed_in_slot = match current_time.duration_since(UNIX_EPOCH) { - Ok(duration) => duration.as_secs(), - Err(_) => 0, - } % configured_delay_secs; - tracing::info!( - "Delaying rebalance by {} seconds", - configured_delay_secs - time_elapsed_in_slot - ); - - thread::sleep(Duration::from_secs( - configured_delay_secs - time_elapsed_in_slot, - )); -} - -pub fn get_rebalance_delay_secs(consumer_group: &str) -> Option { - match runtime_config::get_str_config( - format!( - "quantized_rebalance_consumer_group_delay_secs__{}", - consumer_group - ) - .as_str(), - ) { - Ok(delay_secs) => match delay_secs { - Some(secs) => match secs.parse() { - Ok(v) => Some(v), - Err(_) => None, - }, - None => None, - }, - Err(_) => None, - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_delay_config() { - runtime_config::del_str_config_direct( - "quantized_rebalance_consumer_group_delay_secs__spans", - ) - .unwrap(); - let delay_secs = get_rebalance_delay_secs("spans"); - assert_eq!(delay_secs, None); - runtime_config::set_str_config_direct( - "quantized_rebalance_consumer_group_delay_secs__spans", - "420", - ) - .unwrap(); - let delay_secs = get_rebalance_delay_secs("spans"); - assert_eq!(delay_secs, Some(420)); - runtime_config::set_str_config_direct( - "quantized_rebalance_consumer_group_delay_secs__spans", - "garbage", - ) - .unwrap(); - let delay_secs = get_rebalance_delay_secs("spans"); - assert_eq!(delay_secs, None); - runtime_config::del_str_config_direct( - "quantized_rebalance_consumer_group_delay_secs__spans", - ) - .unwrap(); - } -} diff --git a/rust_snuba/src/runtime_config.rs b/rust_snuba/src/runtime_config.rs index 9d385a6b01..ef24bfa230 100644 --- a/rust_snuba/src/runtime_config.rs +++ b/rust_snuba/src/runtime_config.rs @@ -1,9 +1,7 @@ use anyhow::Error; use parking_lot::RwLock; use pyo3::prelude::{PyModule, Python}; -use redis::Commands; use std::collections::BTreeMap; -use std::collections::HashMap; use std::time::Duration; use rust_arroyo::timer; @@ -11,67 +9,10 @@ use rust_arroyo::utils::timing::Deadline; static CONFIG: RwLock, Deadline)>> = RwLock::new(BTreeMap::new()); -static CONFIG_HASHSET_KEY: &str = "snuba-config"; - -fn get_redis_client() -> Result { - let redis_host = std::env::var("REDIS_HOST").unwrap_or(String::from("127.0.0.1")); - let redis_port = std::env::var("REDIS_PORT").unwrap_or(String::from("6379")); - let redis_password = std::env::var("REDIS_PASSWORD").unwrap_or(String::from("")); - let redis_db = std::env::var("REDIS_DB").unwrap_or(String::from("1")); - // TODO: handle SSL? - let url = format!( - "redis://{}:{}@{}:{}/{}", - "default", redis_password, redis_host, redis_port, redis_db - ); - redis::Client::open(url) -} - -fn get_str_config_direct(key: &str) -> Result, Error> { - let deadline = Deadline::new(Duration::from_secs(10)); - - let client = get_redis_client()?; - let mut con = client.get_connection()?; - - let configmap: HashMap = con.hgetall(CONFIG_HASHSET_KEY)?; - let val = match configmap.get(key) { - Some(val) => Some(val.clone()), - None => return Ok(None), - }; - - CONFIG - .write() - .insert(key.to_string(), (val.clone(), deadline)); - Ok(CONFIG.read().get(key).unwrap().0.clone()) -} - -#[allow(dead_code)] -pub fn set_str_config_direct(key: &str, val: &str) -> Result<(), Error> { - let client = get_redis_client()?; - let mut con = client.get_connection()?; - con.hset(CONFIG_HASHSET_KEY, key, val)?; - Ok(()) -} - -#[allow(dead_code)] -pub fn del_str_config_direct(key: &str) -> Result<(), Error> { - let client = get_redis_client()?; - let mut con = client.get_connection()?; - con.hdel(CONFIG_HASHSET_KEY, key)?; - Ok(()) -} - /// Runtime config is cached for 10 seconds pub fn get_str_config(key: &str) -> Result, Error> { let deadline = Deadline::new(Duration::from_secs(10)); - match get_str_config_direct(key) { - Ok(val) => return Ok(val), - Err(error) => tracing::error!( - "Could not get config from redis directly, falling back to python {}", - error - ), - } - if let Some(value) = CONFIG.read().get(key) { let (config, deadline) = value; if !deadline.has_elapsed() {