Skip to content

Commit

Permalink
Revert "feat(consumers): rust consumers quantized rebalance (#6561)"
Browse files Browse the repository at this point in the history
This reverts commit f5116c1.

Co-authored-by: volokluev <[email protected]>
  • Loading branch information
getsentry-bot and volokluev committed Nov 15, 2024
1 parent f3b9a1c commit 4f8cfa9
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 203 deletions.
40 changes: 0 additions & 40 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ data-encoding = "2.5.0"
zstd = "0.12.3"
serde_with = "3.8.1"
seq-macro = "0.3"
redis = "0.27.5"


[patch.crates-io]
Expand Down
25 changes: 4 additions & 21 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::metrics::global_tags::set_global_tag;
use crate::metrics::statsd::StatsDBackend;
use crate::mutations::factory::MutConsumerStrategyFactory;
use crate::processors;
use crate::rebalancing;
use crate::types::{InsertOrReplacement, KafkaMessageMetadata};

#[pyfunction]
Expand Down Expand Up @@ -235,11 +234,6 @@ pub fn consumer_impl(

let topic = Topic::new(&consumer_config.raw_topic.physical_topic_name);

let rebalance_delay_secs = rebalancing::get_rebalance_delay_secs(consumer_group);
if let Some(secs) = rebalance_delay_secs {
rebalancing::delay_kafka_rebalance(secs)
}

let processor = if mutations_mode {
let mut_factory = MutConsumerStrategyFactory {
storage_config: first_storage,
Expand Down Expand Up @@ -292,21 +286,10 @@ pub fn consumer_impl(

let mut handle = processor.get_handle();

match rebalance_delay_secs {
Some(secs) => {
ctrlc::set_handler(move || {
rebalancing::delay_kafka_rebalance(secs);
handle.signal_shutdown();
})
.expect("Error setting Ctrl-C handler");
}
None => {
ctrlc::set_handler(move || {
handle.signal_shutdown();
})
.expect("Error setting Ctrl-C handler");
}
}
ctrlc::set_handler(move || {
handle.signal_shutdown();
})
.expect("Error setting Ctrl-C handler");

if let Err(error) = processor.run() {
let error: &dyn std::error::Error = &error;
Expand Down
1 change: 0 additions & 1 deletion rust_snuba/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod logging;
mod metrics;
mod mutations;
mod processors;
mod rebalancing;
mod runtime_config;
mod strategies;
mod types;
Expand Down
81 changes: 0 additions & 81 deletions rust_snuba/src/rebalancing.rs

This file was deleted.

59 changes: 0 additions & 59 deletions rust_snuba/src/runtime_config.rs
Original file line number Diff line number Diff line change
@@ -1,77 +1,18 @@
use anyhow::Error;
use parking_lot::RwLock;
use pyo3::prelude::{PyModule, Python};
use redis::Commands;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::time::Duration;

use rust_arroyo::timer;
use rust_arroyo::utils::timing::Deadline;

static CONFIG: RwLock<BTreeMap<String, (Option<String>, Deadline)>> = RwLock::new(BTreeMap::new());

static CONFIG_HASHSET_KEY: &str = "snuba-config";

fn get_redis_client() -> Result<redis::Client, redis::RedisError> {
let redis_host = std::env::var("REDIS_HOST").unwrap_or(String::from("127.0.0.1"));
let redis_port = std::env::var("REDIS_PORT").unwrap_or(String::from("6379"));
let redis_password = std::env::var("REDIS_PASSWORD").unwrap_or(String::from(""));
let redis_db = std::env::var("REDIS_DB").unwrap_or(String::from("1"));
// TODO: handle SSL?
let url = format!(
"redis://{}:{}@{}:{}/{}",
"default", redis_password, redis_host, redis_port, redis_db
);
redis::Client::open(url)
}

fn get_str_config_direct(key: &str) -> Result<Option<String>, Error> {
let deadline = Deadline::new(Duration::from_secs(10));

let client = get_redis_client()?;
let mut con = client.get_connection()?;

let configmap: HashMap<String, String> = con.hgetall(CONFIG_HASHSET_KEY)?;
let val = match configmap.get(key) {
Some(val) => Some(val.clone()),
None => return Ok(None),
};

CONFIG
.write()
.insert(key.to_string(), (val.clone(), deadline));
Ok(CONFIG.read().get(key).unwrap().0.clone())
}

#[allow(dead_code)]
pub fn set_str_config_direct(key: &str, val: &str) -> Result<(), Error> {
let client = get_redis_client()?;
let mut con = client.get_connection()?;
con.hset(CONFIG_HASHSET_KEY, key, val)?;
Ok(())
}

#[allow(dead_code)]
pub fn del_str_config_direct(key: &str) -> Result<(), Error> {
let client = get_redis_client()?;
let mut con = client.get_connection()?;
con.hdel(CONFIG_HASHSET_KEY, key)?;
Ok(())
}

/// Runtime config is cached for 10 seconds
pub fn get_str_config(key: &str) -> Result<Option<String>, Error> {
let deadline = Deadline::new(Duration::from_secs(10));

match get_str_config_direct(key) {
Ok(val) => return Ok(val),
Err(error) => tracing::error!(
"Could not get config from redis directly, falling back to python {}",
error
),
}

if let Some(value) = CONFIG.read().get(key) {
let (config, deadline) = value;
if !deadline.has_elapsed() {
Expand Down

0 comments on commit 4f8cfa9

Please sign in to comment.