From 58e65a7a0681e212c5fec0134964cae4ba6e7cf2 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 5 Jan 2025 20:46:24 -0400 Subject: [PATCH] Change collector's queue to use generics (#6486) ## Which problem is this PR solving? - Continuation of #6474 ## Description of the changes - In order to allow the queue to carry both v1 and v2 data model, let's first make the queue strongly typed by using generics ## How was this change tested? - unit tests, CI --------- Signed-off-by: Yuri Shkuro Signed-off-by: adityachopra29 --- cmd/collector/app/span_processor.go | 17 ++++---- pkg/queue/bounded_queue.go | 63 ++++++++++++++--------------- pkg/queue/bounded_queue_test.go | 36 ++++++++--------- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index a3541cd3c6c..64832f0eaca 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -30,7 +30,7 @@ const ( ) type spanProcessor struct { - queue *queue.BoundedQueue + queue *queue.BoundedQueue[queueItem] queueResizeMu sync.Mutex metrics *SpanProcessorMetrics preProcessSpans ProcessSpans @@ -63,9 +63,8 @@ func NewSpanProcessor( ) processor.SpanProcessor { sp := newSpanProcessor(spanWriter, additional, opts...) - sp.queue.StartConsumers(sp.numWorkers, func(item any) { - value := item.(*queueItem) - sp.processItemFromQueue(value) + sp.queue.StartConsumers(sp.numWorkers, func(item queueItem) { + sp.processItemFromQueue(item) }) sp.background(1*time.Second, sp.updateGauges) @@ -83,13 +82,13 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt options.serviceMetrics, options.hostMetrics, options.extraFormatTypes) - droppedItemHandler := func(item any) { + droppedItemHandler := func(item queueItem) { handlerMetrics.SpansDropped.Inc(1) if options.onDroppedSpan != nil { - options.onDroppedSpan(item.(*queueItem).span) + options.onDroppedSpan(item.span) } } - boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler) + boundedQueue := queue.NewBoundedQueue[queueItem](options.queueSize, droppedItemHandler) sanitizers := sanitizer.NewStandardSanitizers() if options.sanitizer != nil { @@ -196,7 +195,7 @@ func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { return retMe, nil } -func (sp *spanProcessor) processItemFromQueue(item *queueItem) { +func (sp *spanProcessor) processItemFromQueue(item queueItem) { // TODO calling sanitizer here contradicts the comment in enqueueSpan about immutable Process. sp.processSpan(sp.sanitizer(item.span), item.tenant) sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime)) @@ -237,7 +236,7 @@ func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor. // add format tag span.Tags = append(span.Tags, model.String("internal.span.format", string(originalFormat))) - item := &queueItem{ + item := queueItem{ queuedTime: time.Now(), span: span, tenant: tenant, diff --git a/pkg/queue/bounded_queue.go b/pkg/queue/bounded_queue.go index eb1842d0fa1..3a998082248 100644 --- a/pkg/queue/bounded_queue.go +++ b/pkg/queue/bounded_queue.go @@ -8,14 +8,13 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "github.com/jaegertracing/jaeger/pkg/metrics" ) // Consumer consumes data from a bounded queue -type Consumer interface { - Consume(item any) +type Consumer[T any] interface { + Consume(item T) } // BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue, @@ -23,27 +22,27 @@ type Consumer interface { // the producer force the earliest items to be dropped. The implementation is actually based on // channels, with a special Reaper goroutine that wakes up when the queue is full and consumers // the items from the top of the queue until its size drops back to maxSize -type BoundedQueue struct { +type BoundedQueue[T any] struct { workers int stopWG sync.WaitGroup size atomic.Int32 capacity atomic.Uint32 stopped atomic.Uint32 - items *chan any - onDroppedItem func(item any) - factory func() Consumer + items atomic.Pointer[chan T] + onDroppedItem func(item T) + factory func() Consumer[T] stopCh chan struct{} } // NewBoundedQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). -func NewBoundedQueue(capacity int, onDroppedItem func(item any)) *BoundedQueue { - queue := make(chan any, capacity) - bq := &BoundedQueue{ +func NewBoundedQueue[T any](capacity int, onDroppedItem func(item T)) *BoundedQueue[T] { + queue := make(chan T, capacity) + bq := &BoundedQueue[T]{ onDroppedItem: onDroppedItem, - items: &queue, stopCh: make(chan struct{}), } + bq.items.Store(&queue) //nolint: gosec // G115 bq.capacity.Store(uint32(capacity)) return bq @@ -51,7 +50,7 @@ func NewBoundedQueue(capacity int, onDroppedItem func(item any)) *BoundedQueue { // StartConsumersWithFactory creates a given number of consumers consuming items // from the queue in separate goroutines. -func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer) { +func (q *BoundedQueue[T]) StartConsumersWithFactory(num int, factory func() Consumer[T]) { q.workers = num q.factory = factory var startWG sync.WaitGroup @@ -62,10 +61,10 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume startWG.Done() defer q.stopWG.Done() consumer := q.factory() - queue := *q.items + queue := q.items.Load() for { select { - case item, ok := <-queue: + case item, ok := <-*queue: if ok { q.size.Add(-1) consumer.Consume(item) @@ -85,23 +84,23 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume // ConsumerFunc is an adapter to allow the use of // a consume function callback as a Consumer. -type ConsumerFunc func(item any) +type ConsumerFunc[T any] func(item T) // Consume calls c(item) -func (c ConsumerFunc) Consume(item any) { +func (c ConsumerFunc[T]) Consume(item T) { c(item) } // StartConsumers starts a given number of goroutines consuming items from the queue // and passing them into the consumer callback. -func (q *BoundedQueue) StartConsumers(num int, callback func(item any)) { - q.StartConsumersWithFactory(num, func() Consumer { - return ConsumerFunc(callback) +func (q *BoundedQueue[T]) StartConsumers(num int, callback func(item T)) { + q.StartConsumersWithFactory(num, func() Consumer[T] { + return ConsumerFunc[T](callback) }) } // Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. -func (q *BoundedQueue) Produce(item any) bool { +func (q *BoundedQueue[T]) Produce(item T) bool { if q.stopped.Load() != 0 { q.onDroppedItem(item) return false @@ -117,8 +116,9 @@ func (q *BoundedQueue) Produce(item any) bool { } q.size.Add(1) + queue := q.items.Load() select { - case *q.items <- item: + case *queue <- item: return true default: // should not happen, as overflows should have been captured earlier @@ -132,26 +132,26 @@ func (q *BoundedQueue) Produce(item any) bool { // Stop stops all consumers, as well as the length reporter if started, // and releases the items channel. It blocks until all consumers have stopped. -func (q *BoundedQueue) Stop() { +func (q *BoundedQueue[T]) Stop() { q.stopped.Store(1) // disable producer close(q.stopCh) q.stopWG.Wait() - close(*q.items) + close(*q.items.Load()) } // Size returns the current size of the queue -func (q *BoundedQueue) Size() int { +func (q *BoundedQueue[T]) Size() int { return int(q.size.Load()) } // Capacity returns capacity of the queue -func (q *BoundedQueue) Capacity() int { +func (q *BoundedQueue[T]) Capacity() int { return int(q.capacity.Load()) } // StartLengthReporting starts a timer-based goroutine that periodically reports // current queue length to a given metrics gauge. -func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge metrics.Gauge) { +func (q *BoundedQueue[T]) StartLengthReporting(reportPeriod time.Duration, gauge metrics.Gauge) { ticker := time.NewTicker(reportPeriod) go func() { defer ticker.Stop() @@ -168,24 +168,23 @@ func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge me } // Resize changes the capacity of the queue, returning whether the action was successful -func (q *BoundedQueue) Resize(capacity int) bool { +func (q *BoundedQueue[T]) Resize(capacity int) bool { if capacity == q.Capacity() { // noop return false } - previous := *q.items - queue := make(chan any, capacity) + previous := q.items.Load() + queue := make(chan T, capacity) // swap queues - // #nosec - swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue)) + swapped := q.items.CompareAndSwap(previous, &queue) if swapped { // start a new set of consumers, based on the information given previously q.StartConsumersWithFactory(q.workers, q.factory) // gracefully drain the existing queue - close(previous) + close(*previous) // update the capacity //nolint: gosec // G115 diff --git a/pkg/queue/bounded_queue_test.go b/pkg/queue/bounded_queue_test.go index f3597f9c6b8..714b5c396d6 100644 --- a/pkg/queue/bounded_queue_test.go +++ b/pkg/queue/bounded_queue_test.go @@ -23,12 +23,12 @@ import ( // In this test we run a queue with capacity 1 and a single consumer. // We want to test the overflow behavior, so we block the consumer // by holding a startLock before submitting items to the queue. -func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(item any))) { +func helper(t *testing.T, startConsumers func(q *BoundedQueue[string], consumerFn func(item string))) { mFact := metricstest.NewFactory(0) counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil}) gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil}) - q := NewBoundedQueue(1, func( /* item */ any) { + q := NewBoundedQueue[string](1, func( /* item */ string) { counter.Inc(1) }) assert.Equal(t, 1, q.Capacity()) @@ -38,8 +38,8 @@ func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(i startLock.Lock() // block consumers consumerState := newConsumerState(t) - startConsumers(q, func(item any) { - consumerState.record(item.(string)) + startConsumers(q, func(item string) { + consumerState.record(item) // block further processing until startLock is released startLock.Lock() @@ -102,14 +102,14 @@ func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(i } func TestBoundedQueue(t *testing.T) { - helper(t, func(q *BoundedQueue, consumerFn func(item any)) { + helper(t, func(q *BoundedQueue[string], consumerFn func(item string)) { q.StartConsumers(1, consumerFn) }) } func TestBoundedQueueWithFactory(t *testing.T) { - helper(t, func(q *BoundedQueue, consumerFn func(item any)) { - q.StartConsumersWithFactory(1, func() Consumer { return ConsumerFunc(consumerFn) }) + helper(t, func(q *BoundedQueue[string], consumerFn func(item string)) { + q.StartConsumersWithFactory(1, func() Consumer[string] { return ConsumerFunc[string](consumerFn) }) }) } @@ -163,7 +163,7 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { } func TestResizeUp(t *testing.T) { - q := NewBoundedQueue(2, func(item any) { + q := NewBoundedQueue(2, func(item string) { fmt.Printf("dropped: %v\n", item) }) @@ -173,7 +173,7 @@ func TestResizeUp(t *testing.T) { releaseConsumers.Add(1) released, resized := false, false - q.StartConsumers(1, func( /* item */ any) { + q.StartConsumers(1, func( /* item */ string) { if !resized { // we'll have a second consumer once the queue is resized // signal that the worker is processing firstConsumer.Done() @@ -194,7 +194,7 @@ func TestResizeUp(t *testing.T) { assert.False(t, q.Produce("d")) // dropped assert.EqualValues(t, 2, q.Capacity()) assert.EqualValues(t, q.Capacity(), q.Size()) - assert.Len(t, *q.items, q.Capacity()) + assert.Len(t, *q.items.Load(), q.Capacity()) resized = true assert.True(t, q.Resize(4)) @@ -207,14 +207,14 @@ func TestResizeUp(t *testing.T) { assert.EqualValues(t, 4, q.Capacity()) assert.EqualValues(t, q.Capacity(), q.Size()) // the combined queues are at the capacity right now - assert.Len(t, *q.items, 2) // the new internal queue should have two items only + assert.Len(t, *q.items.Load(), 2) // the new internal queue should have two items only released = true releaseConsumers.Done() } func TestResizeDown(t *testing.T) { - q := NewBoundedQueue(4, func(item any) { + q := NewBoundedQueue(4, func(item string) { fmt.Printf("dropped: %v\n", item) }) @@ -223,7 +223,7 @@ func TestResizeDown(t *testing.T) { releaseConsumers.Add(1) released := false - q.StartConsumers(1, func( /* item */ any) { + q.StartConsumers(1, func( /* item */ string) { // once we release the lock, we might end up with multiple calls to reach this if !released { // signal that the worker is processing @@ -244,14 +244,14 @@ func TestResizeDown(t *testing.T) { assert.True(t, q.Produce("e")) // dropped assert.EqualValues(t, 4, q.Capacity()) assert.EqualValues(t, q.Capacity(), q.Size()) - assert.Len(t, *q.items, q.Capacity()) + assert.Len(t, *q.items.Load(), q.Capacity()) assert.True(t, q.Resize(2)) assert.False(t, q.Produce("f")) // dropped assert.EqualValues(t, 2, q.Capacity()) assert.EqualValues(t, 4, q.Size()) // the queue will eventually drain, but it will live for a while over capacity - assert.Empty(t, *q.items) // the new queue is empty, as the old queue is still full and over capacity + assert.Empty(t, *q.items.Load()) // the new queue is empty, as the old queue is still full and over capacity released = true releaseConsumers.Done() @@ -332,10 +332,10 @@ func BenchmarkBoundedQueue(b *testing.B) { } func BenchmarkBoundedQueueWithFactory(b *testing.B) { - q := NewBoundedQueue(1000, func( /* item */ any) {}) + q := NewBoundedQueue(1000, func( /* item */ int) {}) - q.StartConsumersWithFactory(10, func() Consumer { - return ConsumerFunc(func( /* item */ any) {}) + q.StartConsumersWithFactory(10, func() Consumer[int] { + return ConsumerFunc[int](func( /* item */ int) {}) }) defer q.Stop()