Skip to content

Commit

Permalink
Change collector's queue to use generics (#6486)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Continuation of #6474

## Description of the changes
- In order to allow the queue to carry both v1 and v2 data model, let's
first make the queue strongly typed by using generics

## How was this change tested?
- unit tests, CI

---------

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jan 6, 2025
1 parent b1153a0 commit 232d805
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 59 deletions.
17 changes: 8 additions & 9 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
)

type spanProcessor struct {
queue *queue.BoundedQueue
queue *queue.BoundedQueue[queueItem]
queueResizeMu sync.Mutex
metrics *SpanProcessorMetrics
preProcessSpans ProcessSpans
Expand Down Expand Up @@ -63,9 +63,8 @@ func NewSpanProcessor(
) processor.SpanProcessor {
sp := newSpanProcessor(spanWriter, additional, opts...)

sp.queue.StartConsumers(sp.numWorkers, func(item any) {
value := item.(*queueItem)
sp.processItemFromQueue(value)
sp.queue.StartConsumers(sp.numWorkers, func(item queueItem) {
sp.processItemFromQueue(item)
})

sp.background(1*time.Second, sp.updateGauges)
Expand All @@ -83,13 +82,13 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt
options.serviceMetrics,
options.hostMetrics,
options.extraFormatTypes)
droppedItemHandler := func(item any) {
droppedItemHandler := func(item queueItem) {
handlerMetrics.SpansDropped.Inc(1)
if options.onDroppedSpan != nil {
options.onDroppedSpan(item.(*queueItem).span)
options.onDroppedSpan(item.span)
}
}
boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler)
boundedQueue := queue.NewBoundedQueue[queueItem](options.queueSize, droppedItemHandler)

sanitizers := sanitizer.NewStandardSanitizers()
if options.sanitizer != nil {
Expand Down Expand Up @@ -196,7 +195,7 @@ func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
return retMe, nil
}

func (sp *spanProcessor) processItemFromQueue(item *queueItem) {
func (sp *spanProcessor) processItemFromQueue(item queueItem) {
// TODO calling sanitizer here contradicts the comment in enqueueSpan about immutable Process.
sp.processSpan(sp.sanitizer(item.span), item.tenant)
sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime))
Expand Down Expand Up @@ -237,7 +236,7 @@ func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.
// add format tag
span.Tags = append(span.Tags, model.String("internal.span.format", string(originalFormat)))

item := &queueItem{
item := queueItem{
queuedTime: time.Now(),
span: span,
tenant: tenant,
Expand Down
63 changes: 31 additions & 32 deletions pkg/queue/bounded_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,50 +8,49 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/jaegertracing/jaeger/pkg/metrics"
)

// Consumer consumes data from a bounded queue
type Consumer interface {
Consume(item any)
type Consumer[T any] interface {
Consume(item T)
}

// BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer force the earliest items to be dropped. The implementation is actually based on
// channels, with a special Reaper goroutine that wakes up when the queue is full and consumers
// the items from the top of the queue until its size drops back to maxSize
type BoundedQueue struct {
type BoundedQueue[T any] struct {
workers int
stopWG sync.WaitGroup
size atomic.Int32
capacity atomic.Uint32
stopped atomic.Uint32
items *chan any
onDroppedItem func(item any)
factory func() Consumer
items atomic.Pointer[chan T]
onDroppedItem func(item T)
factory func() Consumer[T]
stopCh chan struct{}
}

// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedQueue(capacity int, onDroppedItem func(item any)) *BoundedQueue {
queue := make(chan any, capacity)
bq := &BoundedQueue{
func NewBoundedQueue[T any](capacity int, onDroppedItem func(item T)) *BoundedQueue[T] {
queue := make(chan T, capacity)
bq := &BoundedQueue[T]{
onDroppedItem: onDroppedItem,
items: &queue,
stopCh: make(chan struct{}),
}
bq.items.Store(&queue)
//nolint: gosec // G115
bq.capacity.Store(uint32(capacity))
return bq
}

// StartConsumersWithFactory creates a given number of consumers consuming items
// from the queue in separate goroutines.
func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consumer) {
func (q *BoundedQueue[T]) StartConsumersWithFactory(num int, factory func() Consumer[T]) {
q.workers = num
q.factory = factory
var startWG sync.WaitGroup
Expand All @@ -62,10 +61,10 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume
startWG.Done()
defer q.stopWG.Done()
consumer := q.factory()
queue := *q.items
queue := q.items.Load()
for {
select {
case item, ok := <-queue:
case item, ok := <-*queue:
if ok {
q.size.Add(-1)
consumer.Consume(item)
Expand All @@ -85,23 +84,23 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume

// ConsumerFunc is an adapter to allow the use of
// a consume function callback as a Consumer.
type ConsumerFunc func(item any)
type ConsumerFunc[T any] func(item T)

// Consume calls c(item)
func (c ConsumerFunc) Consume(item any) {
func (c ConsumerFunc[T]) Consume(item T) {
c(item)
}

// StartConsumers starts a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
func (q *BoundedQueue) StartConsumers(num int, callback func(item any)) {
q.StartConsumersWithFactory(num, func() Consumer {
return ConsumerFunc(callback)
func (q *BoundedQueue[T]) StartConsumers(num int, callback func(item T)) {
q.StartConsumersWithFactory(num, func() Consumer[T] {
return ConsumerFunc[T](callback)
})
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *BoundedQueue) Produce(item any) bool {
func (q *BoundedQueue[T]) Produce(item T) bool {
if q.stopped.Load() != 0 {
q.onDroppedItem(item)
return false
Expand All @@ -117,8 +116,9 @@ func (q *BoundedQueue) Produce(item any) bool {
}

q.size.Add(1)
queue := q.items.Load()
select {
case *q.items <- item:
case *queue <- item:
return true
default:
// should not happen, as overflows should have been captured earlier
Expand All @@ -132,26 +132,26 @@ func (q *BoundedQueue) Produce(item any) bool {

// Stop stops all consumers, as well as the length reporter if started,
// and releases the items channel. It blocks until all consumers have stopped.
func (q *BoundedQueue) Stop() {
func (q *BoundedQueue[T]) Stop() {
q.stopped.Store(1) // disable producer
close(q.stopCh)
q.stopWG.Wait()
close(*q.items)
close(*q.items.Load())
}

// Size returns the current size of the queue
func (q *BoundedQueue) Size() int {
func (q *BoundedQueue[T]) Size() int {
return int(q.size.Load())
}

// Capacity returns capacity of the queue
func (q *BoundedQueue) Capacity() int {
func (q *BoundedQueue[T]) Capacity() int {
return int(q.capacity.Load())
}

// StartLengthReporting starts a timer-based goroutine that periodically reports
// current queue length to a given metrics gauge.
func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge metrics.Gauge) {
func (q *BoundedQueue[T]) StartLengthReporting(reportPeriod time.Duration, gauge metrics.Gauge) {
ticker := time.NewTicker(reportPeriod)
go func() {
defer ticker.Stop()
Expand All @@ -168,24 +168,23 @@ func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge me
}

// Resize changes the capacity of the queue, returning whether the action was successful
func (q *BoundedQueue) Resize(capacity int) bool {
func (q *BoundedQueue[T]) Resize(capacity int) bool {
if capacity == q.Capacity() {
// noop
return false
}

previous := *q.items
queue := make(chan any, capacity)
previous := q.items.Load()
queue := make(chan T, capacity)

// swap queues
// #nosec
swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue))
swapped := q.items.CompareAndSwap(previous, &queue)
if swapped {
// start a new set of consumers, based on the information given previously
q.StartConsumersWithFactory(q.workers, q.factory)

// gracefully drain the existing queue
close(previous)
close(*previous)

// update the capacity
//nolint: gosec // G115
Expand Down
36 changes: 18 additions & 18 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
// In this test we run a queue with capacity 1 and a single consumer.
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(item any))) {
func helper(t *testing.T, startConsumers func(q *BoundedQueue[string], consumerFn func(item string))) {
mFact := metricstest.NewFactory(0)
counter := mFact.Counter(metrics.Options{Name: "dropped", Tags: nil})
gauge := mFact.Gauge(metrics.Options{Name: "size", Tags: nil})

q := NewBoundedQueue(1, func( /* item */ any) {
q := NewBoundedQueue[string](1, func( /* item */ string) {
counter.Inc(1)
})
assert.Equal(t, 1, q.Capacity())
Expand All @@ -38,8 +38,8 @@ func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(i
startLock.Lock() // block consumers
consumerState := newConsumerState(t)

startConsumers(q, func(item any) {
consumerState.record(item.(string))
startConsumers(q, func(item string) {
consumerState.record(item)

// block further processing until startLock is released
startLock.Lock()
Expand Down Expand Up @@ -102,14 +102,14 @@ func helper(t *testing.T, startConsumers func(q *BoundedQueue, consumerFn func(i
}

func TestBoundedQueue(t *testing.T) {
helper(t, func(q *BoundedQueue, consumerFn func(item any)) {
helper(t, func(q *BoundedQueue[string], consumerFn func(item string)) {
q.StartConsumers(1, consumerFn)
})
}

func TestBoundedQueueWithFactory(t *testing.T) {
helper(t, func(q *BoundedQueue, consumerFn func(item any)) {
q.StartConsumersWithFactory(1, func() Consumer { return ConsumerFunc(consumerFn) })
helper(t, func(q *BoundedQueue[string], consumerFn func(item string)) {
q.StartConsumersWithFactory(1, func() Consumer[string] { return ConsumerFunc[string](consumerFn) })
})
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
}

func TestResizeUp(t *testing.T) {
q := NewBoundedQueue(2, func(item any) {
q := NewBoundedQueue(2, func(item string) {
fmt.Printf("dropped: %v\n", item)
})

Expand All @@ -173,7 +173,7 @@ func TestResizeUp(t *testing.T) {
releaseConsumers.Add(1)

released, resized := false, false
q.StartConsumers(1, func( /* item */ any) {
q.StartConsumers(1, func( /* item */ string) {
if !resized { // we'll have a second consumer once the queue is resized
// signal that the worker is processing
firstConsumer.Done()
Expand All @@ -194,7 +194,7 @@ func TestResizeUp(t *testing.T) {
assert.False(t, q.Produce("d")) // dropped
assert.EqualValues(t, 2, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size())
assert.Len(t, *q.items, q.Capacity())
assert.Len(t, *q.items.Load(), q.Capacity())

resized = true
assert.True(t, q.Resize(4))
Expand All @@ -207,14 +207,14 @@ func TestResizeUp(t *testing.T) {

assert.EqualValues(t, 4, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size()) // the combined queues are at the capacity right now
assert.Len(t, *q.items, 2) // the new internal queue should have two items only
assert.Len(t, *q.items.Load(), 2) // the new internal queue should have two items only

released = true
releaseConsumers.Done()
}

func TestResizeDown(t *testing.T) {
q := NewBoundedQueue(4, func(item any) {
q := NewBoundedQueue(4, func(item string) {
fmt.Printf("dropped: %v\n", item)
})

Expand All @@ -223,7 +223,7 @@ func TestResizeDown(t *testing.T) {
releaseConsumers.Add(1)

released := false
q.StartConsumers(1, func( /* item */ any) {
q.StartConsumers(1, func( /* item */ string) {
// once we release the lock, we might end up with multiple calls to reach this
if !released {
// signal that the worker is processing
Expand All @@ -244,14 +244,14 @@ func TestResizeDown(t *testing.T) {
assert.True(t, q.Produce("e")) // dropped
assert.EqualValues(t, 4, q.Capacity())
assert.EqualValues(t, q.Capacity(), q.Size())
assert.Len(t, *q.items, q.Capacity())
assert.Len(t, *q.items.Load(), q.Capacity())

assert.True(t, q.Resize(2))
assert.False(t, q.Produce("f")) // dropped

assert.EqualValues(t, 2, q.Capacity())
assert.EqualValues(t, 4, q.Size()) // the queue will eventually drain, but it will live for a while over capacity
assert.Empty(t, *q.items) // the new queue is empty, as the old queue is still full and over capacity
assert.Empty(t, *q.items.Load()) // the new queue is empty, as the old queue is still full and over capacity

released = true
releaseConsumers.Done()
Expand Down Expand Up @@ -332,10 +332,10 @@ func BenchmarkBoundedQueue(b *testing.B) {
}

func BenchmarkBoundedQueueWithFactory(b *testing.B) {
q := NewBoundedQueue(1000, func( /* item */ any) {})
q := NewBoundedQueue(1000, func( /* item */ int) {})

q.StartConsumersWithFactory(10, func() Consumer {
return ConsumerFunc(func( /* item */ any) {})
q.StartConsumersWithFactory(10, func() Consumer[int] {
return ConsumerFunc[int](func( /* item */ int) {})
})
defer q.Stop()

Expand Down

0 comments on commit 232d805

Please sign in to comment.