-
-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(consumers): rust consumers quantized rebalance #6561
Conversation
rebalancing::delay_kafka_rebalance(secs); | ||
handle.signal_shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I correct in assuming that this closure that is passed in runs in a separate thread, so the main loop is not blocked while it is sleeping before signaling shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the documentation it seems to be the case
https://docs.rs/ctrlc/latest/ctrlc/fn.set_handler.html
Register signal handler for Ctrl-C.
Starts a new dedicated signal handling thread. Should only be called once, typically at the start of your program.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
This seems reasonable to me, do we plan to have quantized rebalancing on the other end? I.e. Making the consumer wait a little bit before joining the consumer group |
@john-z-yang it does wait before joining, see (and I tested that this happens locally as well) |
PR reverted: 4f8cfa9 |
This reverts commit f5116c1. Co-authored-by: volokluev <[email protected]>
Add the ability to do quantized rebalancing for consumers, toggleable by runtime config. ### What is quantized rebalancing? Rebalancing events are synchronized to the tick of the clock. Controlled by the runtime config: `quantized_rebalance_consumer_group_delay_secs__{consumer_group}` Let's say: `SET quantized_rebalance_consumer_group_delay_secs__spans=15` * When a consumer starts up: delay subscribing to a topic until `timestamp % 15 == 0` * when a consumer gets a shutdown signal (ctrl-c): delay leaving the group until `timestamp % 15 == 0` ### What else is in this PR? This PR adds the ability to read from redis config directly. This was mostly done because I couldn't get the python bindings to work on my machine, it will still fall back to the old implementation if there is an error in the direct implementation
reapply #6561 but remove the direct redis call. thanks to volo for writing basically all of the code (with tests!) personal opinion: redis is not a great API boundary to have (there's very few things in redis that help you abstract over the raw datastructure), and using it directly from two codebases would be like accessing the same postgres from multiple microservices. it seems to work fine on localhost. for future reference, I did: ``` make watch-rust-snuba # then make my changes # then go to snuba admin and set the setting for consumer group lol4 RUST_LOG=info snuba rust-consumer --log-level info --storage eap_spans --consumer-group lol4 --concurrency 2 --use-rust-processor --max-batch-size 1000000 --max-batch-time-ms 100 --auto-offset-reset latest --no-strict-offset-reset ``` watch the log output say `Delaying rebalance by 4 seconds` --------- Co-authored-by: volokluev <[email protected]>
Add the ability to do quantized rebalancing for consumers, toggleable by runtime config.
What is quantized rebalancing?
Rebalancing events are synchronized to the tick of the clock. Controlled by the runtime config:
quantized_rebalance_consumer_group_delay_secs__{consumer_group}
Let's say:
SET quantized_rebalance_consumer_group_delay_secs__spans=15
timestamp % 15 == 0
timestamp % 15 == 0
What else is in this PR?
This PR adds the ability to read from redis config directly. This was mostly done because I couldn't get the python bindings to work on my machine, it will still fall back to the old implementation if there is an error in the direct implementation