Skip to content

Commit

Permalink
Upgrade storage integration test to v2 Trace Reader (jaegertracing#6388)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of jaegertracing#6366

## Description of the changes
- Incrementally swaps the fields of `StorageIntegration` to align with
v2 storage api while supporting v1 api
- Updates test functions accordingly to work with the updated fields

## How was this change tested?
- make test

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Emmanuel Emonueje Ebenezer <[email protected]>
Signed-off-by: Ebenezer <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
2 people authored and Manik2708 committed Jan 5, 2025
1 parent 9c60046 commit fd28541
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 33 deletions.
8 changes: 6 additions & 2 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner"
"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const otlpPort = 4317
Expand Down Expand Up @@ -149,8 +150,9 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {

s.SpanWriter, err = createSpanWriter(logger, otlpPort)
require.NoError(t, err)
s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC)
spanReader, err := createSpanReader(logger, ports.QueryGRPC)
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

t.Cleanup(func() {
// Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection.
Expand Down Expand Up @@ -207,7 +209,9 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool {
// e2eCleanUp closes the SpanReader and SpanWriter gRPC connection.
// This function should be called after all the tests are finished.
func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) {
require.NoError(t, s.SpanReader.(io.Closer).Close())
spanReader, err := v1adapter.GetV1Reader(s.TraceReader)
require.NoError(t, err)
require.NoError(t, spanReader.(io.Closer).Close())
require.NoError(t, s.SpanWriter.(io.Closer).Close())
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/tailsampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ts *TailSamplingIntegration) testTailSamplingProccessor(t *testing.T) {
var actual []string
assert.Eventually(t, func() bool {
var err error
actual, err = ts.SpanReader.GetServices(context.Background())
actual, err = ts.TraceReader.GetServices(context.Background())
require.NoError(t, err)
sort.Strings(actual)
return assert.ObjectsAreEqualValues(ts.expectedServices, actual)
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type BadgerIntegrationStorage struct {
Expand All @@ -35,8 +36,9 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.SpanWriter, err = s.factory.CreateSpanWriter()
require.NoError(t, err)

s.SpanReader, err = s.factory.CreateSpanReader()
spanReader, err := s.factory.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

s.SamplingStore, err = s.factory.CreateSamplingStore(0)
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type CassandraStorageIntegration struct {
Expand Down Expand Up @@ -74,8 +75,9 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const (
Expand Down Expand Up @@ -134,8 +135,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type GRPCStorageIntegrationTestSuite struct {
Expand All @@ -38,8 +39,9 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {

s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
56 changes: 34 additions & 22 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

//go:embed fixtures
Expand All @@ -42,7 +44,7 @@ var fixtures embed.FS
// and RunAll() under different conditions.
type StorageIntegration struct {
SpanWriter spanstore.Writer
SpanReader spanstore.Reader
TraceReader tracestore.Reader
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
DependencyWriter dependencystore.Writer
Expand Down Expand Up @@ -79,7 +81,7 @@ type StorageIntegration struct {
// the service name is formatted "query##-service".
type QueryFixtures struct {
Caption string
Query *spanstore.TraceQueryParameters
Query *tracestore.TraceQueryParams
ExpectedFixtures []string
}

Expand Down Expand Up @@ -143,7 +145,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
var actual []string
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetServices(context.Background())
actual, err = s.TraceReader.GetServices(context.Background())
if err != nil {
t.Log(err)
return false
Expand All @@ -154,9 +156,10 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
// If the storage backend returns more services than expected, let's log traces for those
t.Log("🛑 Found unexpected services!")
for _, service := range actual {
traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{
iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{
ServiceName: service,
})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
continue
Expand Down Expand Up @@ -214,10 +217,13 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {
expected := s.writeLargeTraceWithDuplicateSpanIds(t)
expectedTraceID := expected.Spans[0].TraceID

var actual *model.Trace
actual := &model.Trace{} // no spans
found := s.waitForCondition(t, func(_ *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if len(traces) > 0 {
actual = traces[0]
}
return err == nil && len(actual.Spans) >= len(expected.Spans)
})

Expand All @@ -242,27 +248,27 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) {
s.skipIfNeeded(t)
defer s.cleanUp(t)

var expected []spanstore.Operation
var expected []tracestore.Operation
if s.GetOperationsMissingSpanKind {
expected = []spanstore.Operation{
expected = []tracestore.Operation{
{Name: "example-operation-1"},
{Name: "example-operation-3"},
{Name: "example-operation-4"},
}
} else {
expected = []spanstore.Operation{
expected = []tracestore.Operation{
{Name: "example-operation-1", SpanKind: ""},
{Name: "example-operation-3", SpanKind: "server"},
{Name: "example-operation-4", SpanKind: "client"},
}
}
s.loadParseAndWriteExampleTrace(t)

var actual []spanstore.Operation
var actual []tracestore.Operation
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetOperations(context.Background(),
spanstore.OperationQueryParameters{ServiceName: "example-service-1"})
actual, err = s.TraceReader.GetOperations(context.Background(),
tracestore.OperationQueryParameters{ServiceName: "example-service-1"})
if err != nil {
t.Log(err)
return false
Expand All @@ -287,24 +293,29 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {
expected := s.loadParseAndWriteExampleTrace(t)
expectedTraceID := expected.Spans[0].TraceID

var actual *model.Trace
actual := &model.Trace{} // no spans
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
return false
}
if len(traces) > 0 {
actual = traces[0]
}
return err == nil && len(actual.Spans) == len(expected.Spans)
return len(actual.Spans) == len(expected.Spans)
})
if !assert.True(t, found) {
CompareTraces(t, expected, actual)
}

t.Run("NotFound error", func(t *testing.T) {
fakeTraceID := model.TraceID{High: 0, Low: 1}
trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID})
assert.Equal(t, spanstore.ErrTraceNotFound, err)
assert.Nil(t, trace)
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
require.NoError(t, err) // v2 TraceReader no longer returns an error for not found
assert.Empty(t, traces)
})
}

Expand Down Expand Up @@ -342,11 +353,12 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) {
}
}

func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace {
func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*model.Trace) []*model.Trace {
var traces []*model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
traces, err = s.SpanReader.FindTraces(context.Background(), query)
iterTraces := s.TraceReader.FindTraces(context.Background(), *query)
traces, err = v1adapter.V1TracesFromSeq2(iterTraces)
if err != nil {
t.Log(err)
return false
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const defaultLocalKafkaBroker = "127.0.0.1:9092"
Expand Down Expand Up @@ -91,7 +92,8 @@ func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) {
spanConsumer.Start()

s.SpanWriter = spanWriter
s.SpanReader = &ingester{traceStore}
spanReader := &ingester{traceStore}
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.CleanUp = func(_ *testing.T) {}
s.SkipArchiveTest = true
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type MemStorageIntegrationTestSuite struct {
Expand All @@ -24,7 +25,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) {
store := memory.NewStore()
archiveStore := memory.NewStore()
s.SamplingStore = memory.NewSamplingStore(2)
s.SpanReader = store
spanReader := store
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.SpanWriter = store
s.ArchiveSpanReader = archiveStore
s.ArchiveSpanWriter = archiveStore
Expand Down
49 changes: 47 additions & 2 deletions storage_v2/v1adapter/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,26 @@ import (

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
)

// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces)
// V1BatchesFromTraces converts OpenTelemetry traces (ptrace.Traces)
// to Jaeger model batches ([]*model.Batch).
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
func V1BatchesFromTraces(traces ptrace.Traces) []*model.Batch {
batches := jaegerTranslator.ProtoFromTraces(traces)
spanMap := createSpanMapFromBatches(batches)
transferWarningsToModelSpans(traces, spanMap)
return batches
}

// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces)
// to Jaeger model batches ([]*model.Batch).
//
// TODO remove this function in favor of V1BatchesFromTraces
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
return V1BatchesFromTraces(traces)
}

// V1BatchesToTraces converts Jaeger model batches ([]*model.Batch)
// to OpenTelemetry traces (ptrace.Traces).
func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces {
Expand All @@ -32,6 +41,42 @@ func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces {
return traces
}

// V1TracesFromSeq2 converts an interator of ptrace.Traces chunks into v1 traces.
func V1TracesFromSeq2(otelSeq iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) {
var (
jaegerTraces []*model.Trace
iterErr error
)
jptrace.AggregateTraces(otelSeq)(func(otelTrace ptrace.Traces, err error) bool {
if err != nil {
iterErr = err
return false
}
jaegerTraces = append(jaegerTraces, modelTraceFromOtelTrace(otelTrace))
return true
})
if iterErr != nil {
return nil, iterErr
}
return jaegerTraces, nil
}

// modelTraceFromOtelTrace extracts spans from otel traces
func modelTraceFromOtelTrace(otelTrace ptrace.Traces) *model.Trace {
var spans []*model.Span
batches := V1BatchesFromTraces(otelTrace)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
proc := *batch.Process // shallow clone
span.Process = &proc
}
spans = append(spans, span)
}
}
return &model.Trace{Spans: spans}
}

func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span {
spanMap := make(map[model.SpanID]*model.Span)
for _, batch := range batches {
Expand Down
Loading

0 comments on commit fd28541

Please sign in to comment.