From 299e64c3c063d5971c02dd291ddad185215b418c Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sat, 16 Nov 2024 11:34:45 +0100 Subject: [PATCH] Make command and commit queues bounded Addresses #1386 --- .../src/main/scala/zio/kafka/consumer/ConsumerSettings.scala | 4 +++- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index e5e63771f..fd1de8e3a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -32,7 +32,9 @@ final case class ConsumerSettings( metricLabels: Set[MetricLabel] = Set.empty, runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis), authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis), - maxStreamPullIntervalOption: Option[Duration] = None + maxStreamPullIntervalOption: Option[Duration] = None, + commandQueueSize: Int = 1024, // Internal setting, should not be necessary to tune + commitQueueSize: Int = 1024 // Internal setting, should not be necessary to tune ) { // Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType: require( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 8d45f8e55..194a101f3 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -953,8 +953,8 @@ object Runloop { ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) - commitQueue <- ZIO.acquireRelease(Queue.unbounded[Runloop.Commit])(_.shutdown) - commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) + commitQueue <- ZIO.acquireRelease(Queue.bounded[Runloop.Commit](settings.commitQueueSize))(_.shutdown) + commandQueue <- ZIO.acquireRelease(Queue.bounded[RunloopCommand](settings.commandQueueSize))(_.shutdown) lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState)