From 13961b7ed6c9a56ffb6cda26d1f04211c940d023 Mon Sep 17 00:00:00 2001 From: Michael Lam Date: Fri, 10 Jan 2025 17:00:47 +0700 Subject: [PATCH 1/3] kafka-broker-dispatcher rate limiter --- control-plane/pkg/reconciler/trigger/trigger.go | 16 ++++++++++++++++ .../impl/consumer/OrderedConsumerVerticle.java | 7 +++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/control-plane/pkg/reconciler/trigger/trigger.go b/control-plane/pkg/reconciler/trigger/trigger.go index c63ea0ebbf..02364ffe4a 100644 --- a/control-plane/pkg/reconciler/trigger/trigger.go +++ b/control-plane/pkg/reconciler/trigger/trigger.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" "sync" + "strconv" "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -53,6 +54,7 @@ import ( const ( deliveryOrderAnnotation = "kafka.eventing.knative.dev/delivery.order" + triggerVReplicaAnnotation = "kafka.eventing.knative.dev/vreplica" ) type FlagsHolder struct { @@ -325,6 +327,10 @@ func (r *Reconciler) reconcileTriggerEgress(ctx context.Context, broker *eventin Namespace: trigger.GetNamespace(), Name: trigger.GetName(), }, + FeatureFlags: &contract.EgressFeatureFlags{ + EnableRateLimiter: r.KafkaFeatureFlags.IsDispatcherRateLimiterEnabled(), + EnableOrderedExecutorMetrics: r.KafkaFeatureFlags.IsDispatcherOrderedExecutorMetricsEnabled(), + }, } if destination.CACerts != nil { @@ -371,6 +377,16 @@ func (r *Reconciler) reconcileTriggerEgress(ctx context.Context, broker *eventin egress.DeliveryOrder = deliveryOrder } + if r.KafkaFeatureFlags.IsDispatcherRateLimiterEnabled() { + triggerVReplicaAnnotationValue, ok := trigger.Annotations[triggerVReplicaAnnotation] + if ok { + vReplicas, err := strconv.Atoi(triggerVReplicaAnnotationValue) + if err == nil { + egress.VReplicas = int32(vReplicas) + } + } + } + return egress, nil } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java index fe813f3115..7b32d69ee3 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java @@ -63,8 +63,9 @@ public OrderedConsumerVerticle(final ConsumerVerticleContext context, final Init final var vReplicas = Math.max(1, context.getEgress().getVReplicas()); final var tokens = context.getMaxPollRecords() * vReplicas; if (context.getEgress().getFeatureFlags().getEnableRateLimiter()) { + // using intervally will be more effective for precise rate limiting. this.bucket = new LocalBucketBuilder() - .addLimit(Bandwidth.classic(tokens, Refill.greedy(tokens, Duration.ofSeconds(1)))) + .addLimit(Bandwidth.classic(tokens, Refill.intervally(tokens, Duration.ofSeconds(1)))) .withSynchronizationStrategy(SynchronizationStrategy.SYNCHRONIZED) .withMillisecondPrecision() .build(); @@ -181,7 +182,9 @@ private void recordsHandler(ConsumerRecords records) { if (bucket != null) { // Once we have new records, we force add them to internal per-partition queues. - bucket.forceAddTokens(records.count()); + // bucket.forceAddTokens(records.count()); + // I think there are some mistake here since we need to consume message, not add more tokens: https://www.baeldung.com/spring-bucket4j + bucket.tryConsume(records.count()); } // Put records in internal per-partition queues. From 4ee6a65ca2c6ed067690673be3c2ab931299a82f Mon Sep 17 00:00:00 2001 From: Michael Lam Date: Fri, 10 Jan 2025 17:17:22 +0700 Subject: [PATCH 2/3] kafka-broker-dispatcher rate limiter --- .../dispatcher/impl/consumer/OrderedConsumerVerticle.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java index 7b32d69ee3..b03c0fd0ec 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java @@ -183,7 +183,7 @@ private void recordsHandler(ConsumerRecords records) { if (bucket != null) { // Once we have new records, we force add them to internal per-partition queues. // bucket.forceAddTokens(records.count()); - // I think there are some mistake here since we need to consume message, not add more tokens: https://www.baeldung.com/spring-bucket4j + // I think there are some mistake here since we need to consume message, not add more tokens: https://github.com/bucket4j/bucket4j bucket.tryConsume(records.count()); } From d310486afb242463091d386331f609fc2bf349e5 Mon Sep 17 00:00:00 2001 From: Michael Lam Date: Mon, 13 Jan 2025 09:41:46 +0700 Subject: [PATCH 3/3] kafka-broker-dispatcher rate limiter --- control-plane/pkg/reconciler/trigger/trigger.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/control-plane/pkg/reconciler/trigger/trigger.go b/control-plane/pkg/reconciler/trigger/trigger.go index 02364ffe4a..b715fca60e 100644 --- a/control-plane/pkg/reconciler/trigger/trigger.go +++ b/control-plane/pkg/reconciler/trigger/trigger.go @@ -54,7 +54,7 @@ import ( const ( deliveryOrderAnnotation = "kafka.eventing.knative.dev/delivery.order" - triggerVReplicaAnnotation = "kafka.eventing.knative.dev/vreplica" + triggerVReplicasAnnotation = "kafka.eventing.knative.dev/vreplicas" ) type FlagsHolder struct { @@ -378,9 +378,9 @@ func (r *Reconciler) reconcileTriggerEgress(ctx context.Context, broker *eventin } if r.KafkaFeatureFlags.IsDispatcherRateLimiterEnabled() { - triggerVReplicaAnnotationValue, ok := trigger.Annotations[triggerVReplicaAnnotation] + triggerVReplicasAnnotationValue, ok := trigger.Annotations[triggerVReplicasAnnotation] if ok { - vReplicas, err := strconv.Atoi(triggerVReplicaAnnotationValue) + vReplicas, err := strconv.Atoi(triggerVReplicasAnnotationValue) if err == nil { egress.VReplicas = int32(vReplicas) }