Skip to content

Commit

Permalink
[v2][query] Fix query service to not perform aggregation for raw trac…
Browse files Browse the repository at this point in the history
…es (#6453)

## Which problem is this PR solving?
- Towards #6337

## Description of the changes
- The query service was currently performing aggregation when the
`raw_traces` flag is set. This PR fixes that by removing aggregation
when the flag is set.

## How was this change tested?
- Adding unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
2 people authored and Manik2708 committed Jan 2, 2025
1 parent 8eae010 commit e6d0a69
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 12 deletions.
31 changes: 21 additions & 10 deletions cmd/query/app/querysvc/v2/querysvc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,23 +181,34 @@ func (qs QueryService) receiveTraces(
yield func([]ptrace.Traces, error) bool,
rawTraces bool,
) (map[pcommon.TraceID]struct{}, bool) {
aggregatedTraces := jptrace.AggregateTraces(seq)
foundTraceIDs := make(map[pcommon.TraceID]struct{})
proceed := true
aggregatedTraces(func(trace ptrace.Traces, err error) bool {

processTraces := func(traces []ptrace.Traces, err error) bool {
if err != nil {
proceed = yield(nil, err)
return proceed
}
if !rawTraces {
qs.options.Adjuster.Adjust(trace)
for _, trace := range traces {
if !rawTraces {
qs.options.Adjuster.Adjust(trace)
}
jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool {
foundTraceIDs[span.TraceID()] = struct{}{}
return true
})
}
jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool {
foundTraceIDs[span.TraceID()] = struct{}{}
return true
})
proceed = yield([]ptrace.Traces{trace}, nil)
proceed = yield(traces, nil)
return proceed
})
}

if rawTraces {
seq(processTraces)
} else {
jptrace.AggregateTraces(seq)(func(trace ptrace.Traces, err error) bool {
return processTraces([]ptrace.Traces{trace}, err)
})
}

return foundTraceIDs, proceed
}
143 changes: 141 additions & 2 deletions cmd/query/app/querysvc/v2/querysvc/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
Expand Down Expand Up @@ -58,6 +59,12 @@ func withArchiveTraceWriter() testOption {
}
}

func withAdjuster(adj adjuster.Adjuster) testOption {
return func(_ *testQueryService, options *QueryServiceOptions) {
options.Adjuster = adj
}
}

func initializeTestService(opts ...testOption) *testQueryService {
traceReader := &tracestoremocks.Reader{}
dependencyStorage := &depstoremocks.Reader{}
Expand Down Expand Up @@ -89,7 +96,6 @@ func makeTestTrace() ptrace.Traces {
spanB := scopes.Spans().AppendEmpty()
spanB.SetTraceID(testTraceID)
spanB.SetSpanID(pcommon.SpanID([8]byte{2}))
spanB.Attributes()

return trace
}
Expand Down Expand Up @@ -295,7 +301,7 @@ func TestFindTraces_Success(t *testing.T) {
require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID())
}

func TestFindTraces_WithRawTraces(t *testing.T) {
func TestFindTraces_WithRawTraces_PerformsAdjustment(t *testing.T) {
tests := []struct {
rawTraces bool
attributes pcommon.Map
Expand Down Expand Up @@ -376,6 +382,139 @@ func TestFindTraces_WithRawTraces(t *testing.T) {
}
}

func TestFindTraces_WithRawTraces_PerformsAggregation(t *testing.T) {
tests := []struct {
rawTraces bool
traces []ptrace.Traces
expected []ptrace.Traces
expectedAdjustCalls int
}{
{
rawTraces: true,
traces: func() []ptrace.Traces {
traceA := ptrace.NewTraces()
resourcesA := traceA.ResourceSpans().AppendEmpty()
scopesA := resourcesA.ScopeSpans().AppendEmpty()
spanA := scopesA.Spans().AppendEmpty()
spanA.SetTraceID(testTraceID)
spanA.SetName("spanA")
spanA.SetSpanID(pcommon.SpanID([8]byte{1}))

traceB := ptrace.NewTraces()
resourcesB := traceB.ResourceSpans().AppendEmpty()
scopesB := resourcesB.ScopeSpans().AppendEmpty()
spanB := scopesB.Spans().AppendEmpty()
spanB.SetTraceID(testTraceID)
spanB.SetName("spanB")
spanB.SetSpanID(pcommon.SpanID([8]byte{2}))

return []ptrace.Traces{traceA, traceB}
}(),
expected: func() []ptrace.Traces {
traceA := ptrace.NewTraces()
resourcesA := traceA.ResourceSpans().AppendEmpty()
scopesA := resourcesA.ScopeSpans().AppendEmpty()
spanA := scopesA.Spans().AppendEmpty()
spanA.SetTraceID(testTraceID)
spanA.SetName("spanA")
spanA.SetSpanID(pcommon.SpanID([8]byte{1}))

traceB := ptrace.NewTraces()
resourcesB := traceB.ResourceSpans().AppendEmpty()
scopesB := resourcesB.ScopeSpans().AppendEmpty()
spanB := scopesB.Spans().AppendEmpty()
spanB.SetTraceID(testTraceID)
spanB.SetName("spanB")
spanB.SetSpanID(pcommon.SpanID([8]byte{2}))

return []ptrace.Traces{traceA, traceB}
}(),
expectedAdjustCalls: 0,
},
{
rawTraces: false,
traces: func() []ptrace.Traces {
traceA := ptrace.NewTraces()
resourcesA := traceA.ResourceSpans().AppendEmpty()
scopesA := resourcesA.ScopeSpans().AppendEmpty()
spanA := scopesA.Spans().AppendEmpty()
spanA.SetTraceID(testTraceID)
spanA.SetSpanID(pcommon.SpanID([8]byte{1}))

traceB := ptrace.NewTraces()
resourcesB := traceB.ResourceSpans().AppendEmpty()
scopesB := resourcesB.ScopeSpans().AppendEmpty()
spanB := scopesB.Spans().AppendEmpty()
spanB.SetTraceID(testTraceID)
spanB.SetSpanID(pcommon.SpanID([8]byte{2}))

return []ptrace.Traces{traceA, traceB}
}(),
expected: func() []ptrace.Traces {
traceA := ptrace.NewTraces()
resourcesA := traceA.ResourceSpans().AppendEmpty()
scopesA := resourcesA.ScopeSpans().AppendEmpty()
spanA := scopesA.Spans().AppendEmpty()
spanA.SetTraceID(testTraceID)
spanA.SetSpanID(pcommon.SpanID([8]byte{1}))

resourcesB := ptrace.NewResourceSpans()
scopesB := resourcesB.ScopeSpans().AppendEmpty()
spanB := scopesB.Spans().AppendEmpty()
spanB.SetTraceID(testTraceID)
spanB.SetSpanID(pcommon.SpanID([8]byte{2}))

resourcesB.CopyTo(traceA.ResourceSpans().AppendEmpty())

return []ptrace.Traces{traceA}
}(),
// even though there are 2 input chunks, they are for the same trace,
// so we expect only 1 call to Adjuster.
expectedAdjustCalls: 1,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) {
responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
yield(test.traces, nil)
})
adjustCalls := 0
adj := adjuster.Func(func(_ ptrace.Traces) {
adjustCalls++
})

tqs := initializeTestService(withAdjuster(adj))
duration, err := time.ParseDuration("20ms")
require.NoError(t, err)
now := time.Now()
tqs.traceReader.On("FindTraces", mock.Anything, tracestore.TraceQueryParams{
ServiceName: "service",
OperationName: "operation",
StartTimeMax: now,
DurationMin: duration,
NumTraces: 200,
}).
Return(responseIter).Once()

query := TraceQueryParams{
TraceQueryParams: tracestore.TraceQueryParams{
ServiceName: "service",
OperationName: "operation",
StartTimeMax: now,
DurationMin: duration,
NumTraces: 200,
},
RawTraces: test.rawTraces,
}
getTracesIter := tqs.queryService.FindTraces(context.Background(), query)
gotTraces, err := iter.FlattenWithErrors(getTracesIter)
require.NoError(t, err)
assert.Equal(t, test.expected, gotTraces)
assert.Equal(t, test.expectedAdjustCalls, adjustCalls)
})
}
}

func TestArchiveTrace(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit e6d0a69

Please sign in to comment.