Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor collector pipeline to allow v1/v2 data model #6484

Merged
merged 8 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,12 @@ func TestAggregator(t *testing.T) {
},
},
}
_, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
_, err := c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: spans,
Details: processor.Details{
SpanFormat: processor.JaegerSpanFormat,
},
})
require.NoError(t, err)
require.NoError(t, c.Close())

Expand Down
15 changes: 9 additions & 6 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
spanOptions processor.Details // common settings for all spans
tenancyMgr *tenancy.Manager
}

func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.Manager) batchConsumer {
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
spanOptions: processor.Details{
InboundTransport: transport,
SpanFormat: spanFormat,
},
Expand All @@ -75,10 +75,13 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
span.Process = batch.Process
}
}
_, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
_, err = c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: batch.Spans,
Details: processor.Details{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
},
})
if err != nil {
if errors.Is(err, processor.ErrBusy) {
Expand Down
17 changes: 11 additions & 6 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -33,17 +34,21 @@ type mockSpanProcessor struct {
spanFormat processor.SpanFormat
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
oks := make([]bool, len(spans))
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(_ ptrace.Traces) {
panic("not implemented")
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
p.tenants = make(map[string]bool)
}
p.tenants[opts.Tenant] = true
p.transport = opts.InboundTransport
p.spanFormat = opts.SpanFormat
p.tenants[batch.GetTenant()] = true
p.transport = batch.GetInboundTransport()
p.spanFormat = batch.GetSpanFormat()
return oks, p.expectedError
}

Expand Down
18 changes: 12 additions & 6 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
},
})
if err != nil {
jbh.logger.Error("Collector failed to process span batch", zap.Error(err))
Expand Down Expand Up @@ -105,9 +108,12 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
},
})
if err != nil {
h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err))
Expand Down
14 changes: 11 additions & 3 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
Expand Down Expand Up @@ -58,12 +59,19 @@ type shouldIErrorProcessor struct {

var errTestError = errors.New("Whoops")

func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.SpansOptions) ([]bool, error) {
func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
if s.shouldError {
return nil, errTestError
}
retMe := make([]bool, len(mSpans))
for i := range mSpans {
var spans []*model.Span
batch.GetSpans(func(sp []*model.Span) {
spans = sp
}, func(_ ptrace.Traces) {
panic("not implemented")
})

retMe := make([]bool, len(spans))
for i := range spans {
retMe[i] = true
}
return retMe, nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/model_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
package app

import (
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
)

// ProcessSpan processes a Domain Model Span
type ProcessSpan func(span *model.Span, tenant string)

// ProcessSpans processes a batch of Domain Model Spans
type ProcessSpans func(spans []*model.Span, tenant string)
type ProcessSpans func(spans processor.Batch)

// FilterSpan decides whether to allow or disallow a span
type FilterSpan func(span *model.Span) bool
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options {
ret.hostMetrics = metrics.NullFactory
}
if ret.preProcessSpans == nil {
ret.preProcessSpans = func(_ []*model.Span, _ /* tenant */ string) {}
ret.preProcessSpans = func(_ processor.Batch) {}
}
if ret.sanitizer == nil {
ret.sanitizer = func(span *model.Span) *model.Span { return span }
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) {
Options.ServiceMetrics(metrics.NullFactory),
Options.Logger(zap.NewNop()),
Options.NumWorkers(5),
Options.PreProcessSpans(func(_ []*model.Span, _ /* tenant */ string) {}),
Options.PreProcessSpans(func(_ processor.Batch) {}),
Options.Sanitizer(func(span *model.Span) *model.Span { return span }),
Options.QueueSize(10),
Options.DynQueueSizeWarmup(1000),
Expand All @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) {
assert.Nil(t, opts.collectorTags)
assert.False(t, opts.reportBusy)
assert.False(t, opts.blockingSubmit)
assert.NotPanics(t, func() { opts.preProcessSpans(nil, "") })
assert.NotPanics(t, func() { opts.preProcessSpans(processor.SpansV1{}) })
assert.NotPanics(t, func() { opts.preSave(nil, "") })
assert.True(t, opts.spanFilter(nil))
span := model.Span{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,11 @@ package processor

import (
"errors"
"io"

"github.com/jaegertracing/jaeger/model"
)

// ErrBusy signalizes that processor cannot process incoming data
var ErrBusy = errors.New("server busy")

// SpansOptions additional options passed to processor along with the spans.
type SpansOptions struct {
SpanFormat SpanFormat
InboundTransport InboundTransport
Tenant string
}

// SpanProcessor handles model spans
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(mSpans []*model.Span, options SpansOptions) ([]bool, error)
io.Closer
}

// InboundTransport identifies the transport used to receive spans.
type InboundTransport string

Expand Down
71 changes: 71 additions & 0 deletions cmd/collector/app/processor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2020 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package processor

import (
"io"

"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

var (
_ Batch = (*SpansV1)(nil)
_ Batch = (*SpansV2)(nil)
)

// Batch is a batch of spans passed to the processor.
type Batch interface {
// GetSpans delegates to the appropriate function based on the data model version.
GetSpans(v1 func(spans []*model.Span), v2 func(traces ptrace.Traces))
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved

GetSpanFormat() SpanFormat
GetInboundTransport() InboundTransport
GetTenant() string
}

// SpanProcessor handles spans
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
type SpanProcessor interface {
// ProcessSpans processes spans and return with either a list of true/false success or an error
ProcessSpans(spans Batch) ([]bool, error)
io.Closer
}

type Details struct {
SpanFormat SpanFormat
InboundTransport InboundTransport
Tenant string
}

// Spans is a batch of spans passed to the processor.
type SpansV1 struct {
Spans []*model.Span
Details
}

type SpansV2 struct {
Traces ptrace.Traces
Details
}
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved

func (s SpansV1) GetSpans(v1 func([]*model.Span), _ func(ptrace.Traces)) {
v1(s.Spans)
}

func (s SpansV2) GetSpans(_ func([]*model.Span), v2 func(ptrace.Traces)) {
v2(s.Traces)
}

func (d Details) GetSpanFormat() SpanFormat {
return d.SpanFormat
}

func (d Details) GetInboundTransport() InboundTransport {
return d.InboundTransport
}

func (d Details) GetTenant() string {
return d.Tenant
}
66 changes: 66 additions & 0 deletions cmd/collector/app/processor/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package processor

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

func TestDetails(t *testing.T) {
d := Details{
SpanFormat: JaegerSpanFormat,
InboundTransport: GRPCTransport,
Tenant: "tenant",
}
assert.Equal(t, JaegerSpanFormat, d.GetSpanFormat())
assert.Equal(t, GRPCTransport, d.GetInboundTransport())
assert.Equal(t, "tenant", d.GetTenant())
}

func TestSpansV1(t *testing.T) {
s := SpansV1{
Spans: []*model.Span{{}},
Details: Details{
SpanFormat: JaegerSpanFormat,
InboundTransport: GRPCTransport,
Tenant: "tenant",
},
}
var spans []*model.Span
s.GetSpans(func(s []*model.Span) {
spans = s
}, func(_ ptrace.Traces) {
panic("not implemented")
})
assert.Equal(t, []*model.Span{{}}, spans)
assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat())
assert.Equal(t, GRPCTransport, s.GetInboundTransport())
assert.Equal(t, "tenant", s.GetTenant())
}

func TestSpansV2(t *testing.T) {
s := SpansV2{
Traces: ptrace.NewTraces(),
Details: Details{
SpanFormat: JaegerSpanFormat,
InboundTransport: GRPCTransport,
Tenant: "tenant",
},
}
var traces ptrace.Traces
s.GetSpans(func(_ []*model.Span) {
panic("not implemented")
}, func(t ptrace.Traces) {
traces = t
})
assert.Equal(t, ptrace.NewTraces(), traces)
assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat())
assert.Equal(t, GRPCTransport, s.GetInboundTransport())
assert.Equal(t, "tenant", s.GetTenant())
}
3 changes: 1 addition & 2 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand All @@ -27,6 +26,6 @@ func (*mockSpanProcessor) Close() error {
return nil
}

func (*mockSpanProcessor) ProcessSpans([]*model.Span, processor.SpansOptions) ([]bool, error) {
func (*mockSpanProcessor) ProcessSpans(_ processor.Batch) ([]bool, error) {
return []bool{}, nil
}
Loading
Loading