Skip to content

Commit

Permalink
Merge branch 'main' into es-archive-dependency
Browse files Browse the repository at this point in the history
Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Jan 11, 2025
2 parents 6137308 + 89c4ee4 commit 07c5976
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 89 deletions.
31 changes: 25 additions & 6 deletions CONTRIBUTING_GUIDELINES.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,36 @@ If you are new to GitHub's contribution workflow, we recommend the following set
* Go to the respective Jaeger repo on GitHub and create a fork using the button at the top. Select a destination org where you have write permissions (usually it is your personal "org").
* Clone the fork into your workspace.
* (Recommended): register upstream repo as remote
* After you clone your forked repo, `git remote -v` will show `origin`, e.g. `origin [email protected]:{username}/jaeger.git`
* Add `upstream` remote: `git remote add upstream [email protected]:jaegertracing/jaeger.git`
* Fetch it: `git fetch upstream main`
* Repoint your main branch: `git branch --set-upstream-to=upstream/main main`
* After you clone your forked repo, running below command
```bash
git remote -v
```
will show `origin`, e.g. `origin [email protected]:{username}/jaeger.git`
* Add `upstream` remote:
```bash
git remote add upstream [email protected]:jaegertracing/jaeger.git
```
* Fetch it:
```bash
git fetch upstream main
```
* Repoint your main branch:
```bash
git branch --set-upstream-to=upstream/main main
```
* With this setup, you will not need to keep your main branch in the fork in sync with the upstream repo.

Once you're ready to make changes:
* Create a new local branch (DO NOT make changes to `main`, it will cause CI errors).
* Commit your changes, making sure **each commit is signed**, e.g. `git commit -s ...` ([see below](#certificate-of-origin---sign-your-work)).
* Commit your changes, making sure **each commit is signed** ([see below](#certificate-of-origin---sign-your-work)):
```bash
git commit -s -m "Your commit message"
```
* You do not need to squash the commits, it will happen once the PR is merged into the official repo (but each individual commit must be signed).
* When satisfied, push the changes. Git will likely ask for upstream destination: `git push --set-upstream origin {branch-name}`.
* When satisfied, push the changes. Git will likely ask for upstream destination, so you push commits like this:
```bash
git push --set-upstream origin {branch-name}
```
* After you push, look for the output, it usually contains a URL to create a pull request.
Each PR should have:
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/queue"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand Down Expand Up @@ -237,7 +238,7 @@ func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.
}

// add format tag
span.Tags = append(span.Tags, model.String("internal.span.format", string(originalFormat)))
span.Tags = append(span.Tags, model.String(jptrace.FormatAttribute, string(originalFormat)))

item := queueItem{
queuedTime: time.Now(),
Expand Down
15 changes: 15 additions & 0 deletions internal/jptrace/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0
package jptrace

const (
// WarningsAttribute is the name of the span attribute where we can
// store various warnings produced from transformations,
// such as inbound sanitizers and outbound adjusters.
// The value type of the attribute is a string slice.
WarningsAttribute = "@jaeger@warnings"
// FormatAttribute is a key for span attribute that records the original
// wire format in which the span was received by Jaeger,
// e.g. proto, thrift, json.
FormatAttribute = "@jaeger@format"
)
8 changes: 0 additions & 8 deletions internal/jptrace/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
// WarningsAttribute is the name of the span attribute where we can
// store various warnings produced from transformations,
// such as inbound sanitizers and outbound adjusters.
// The value type of the attribute is a string slice.
WarningsAttribute = "jaeger.internal.warnings"
)

func AddWarnings(span ptrace.Span, warnings ...string) {
var w pcommon.Slice
if currWarnings, ok := span.Attributes().Get(WarningsAttribute); ok {
Expand Down
8 changes: 4 additions & 4 deletions internal/jptrace/warning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TestAddWarning(t *testing.T) {
span := ptrace.NewSpan()
attrs := span.Attributes()
if test.existing != nil {
warnings := attrs.PutEmptySlice("jaeger.internal.warnings")
warnings := attrs.PutEmptySlice(WarningsAttribute)
for _, warn := range test.existing {
warnings.AppendEmpty().SetStr(warn)
}
}
AddWarnings(span, test.newWarn)
warnings, ok := attrs.Get("jaeger.internal.warnings")
warnings, ok := attrs.Get(WarningsAttribute)
assert.True(t, ok)
assert.Equal(t, len(test.expected), warnings.Slice().Len())
for i, expectedWarn := range test.expected {
Expand All @@ -61,7 +61,7 @@ func TestAddWarning(t *testing.T) {
func TestAddWarning_MultipleWarnings(t *testing.T) {
span := ptrace.NewSpan()
AddWarnings(span, "warning-1", "warning-2")
warnings, ok := span.Attributes().Get("jaeger.internal.warnings")
warnings, ok := span.Attributes().Get(WarningsAttribute)
require.True(t, ok)
require.Equal(t, "warning-1", warnings.Slice().At(0).Str())
require.Equal(t, "warning-2", warnings.Slice().At(1).Str())
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestGetWarnings(t *testing.T) {
span := ptrace.NewSpan()
attrs := span.Attributes()
if test.existing != nil {
warnings := attrs.PutEmptySlice("jaeger.internal.warnings")
warnings := attrs.PutEmptySlice(WarningsAttribute)
for _, warn := range test.existing {
warnings.AppendEmpty().SetStr(warn)
}
Expand Down
50 changes: 24 additions & 26 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ var ( // interface comformance checks
type Factory struct {
Options *Options

metricsFactory metrics.Factory
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider
metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.Configuration
archiveConfig *config.Configuration
Expand Down Expand Up @@ -141,20 +139,6 @@ func (f *Factory) configureFromOptions(o *Options) {
// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory = metricsFactory
f.primaryMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "primary",
},
},
)
f.archiveMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
f.logger = logger

primarySession, err := f.sessionBuilderFn(&f.primaryConfig)
Expand Down Expand Up @@ -219,11 +203,11 @@ func NewSession(c *config.Configuration) (cassandra.Session, error) {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
sr, err := cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
sr, err := cSpanStore.NewSpanReader(f.primarySession, f.metricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
if err != nil {
return nil, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.primaryMetricsFactory), nil
return spanstoremetrics.NewReaderDecorator(sr, f.metricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -232,25 +216,32 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
if err != nil {
return nil, err
}
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.primaryMetricsFactory, f.logger, options...)
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger, options...)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
version := cDepStore.GetDependencyVersion(f.primarySession)
return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version)
return cDepStore.NewDependencyStore(f.primarySession, f.metricsFactory, f.logger, version)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
sr, err := cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
archiveMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
sr, err := cSpanStore.NewSpanReader(f.archiveSession, archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
if err != nil {
return nil, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.archiveMetricsFactory), nil
return spanstoremetrics.NewReaderDecorator(sr, archiveMetricsFactory), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
Expand All @@ -262,7 +253,14 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if err != nil {
return nil, err
}
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.archiveMetricsFactory, f.logger, options...)
archiveMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, archiveMetricsFactory, f.logger, options...)
}

// CreateLock implements storage.SamplingStoreFactory
Expand Down
59 changes: 31 additions & 28 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ var ( // interface comformance checks
type Factory struct {
Options *Options

primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider
metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

Expand Down Expand Up @@ -131,20 +130,7 @@ func (f *Factory) configureFromOptions(o *Options) {

// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.primaryMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "primary",
},
},
)
f.archiveMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
f.metricsFactory = metricsFactory
f.logger = logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
Expand Down Expand Up @@ -200,12 +186,12 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
if err != nil {
return nil, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.primaryMetricsFactory), nil
return spanstoremetrics.NewReaderDecorator(sr, f.metricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, f.primaryMetricsFactory, f.logger, "", f.primaryConfig.UseReadWriteAliases)
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, f.metricsFactory, f.logger, "", f.primaryConfig.UseReadWriteAliases)
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -226,19 +212,29 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if err != nil {
return nil, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.archiveMetricsFactory), nil
archiveMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
return spanstoremetrics.NewReaderDecorator(sr, archiveMetricsFactory), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
writeAlias := "archive"
if f.archiveConfig.UseReadWriteAliases {
writeAlias += "-write"
}
return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger, writeAlias, true)
archiveMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, archiveMetricsFactory, f.logger, writeAlias, true)

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / spm (v1, all-in-one)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / grpc / grpc (v1)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / badger / badger (v1)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / hotrod (docker, v1)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / kafka / kafka 3.x v1

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / spm (v2, jaeger)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / docker-images

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / crossdock

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / build-binaries-linux-s390x

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / build-binaries-linux-arm64

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / all-in-one (v1)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / build-binaries-linux-ppc64le

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / hotrod (docker, v2)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / build-binaries-linux-amd64

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / build-binaries-windows-amd64

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / build-binaries-darwin-amd64

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / build-binaries-darwin-arm64

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / hotrod (k8s, v1)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / all-in-one (v2)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / hotrod (k8s, v2)

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / elasticsearch / elasticsearch 6.x v1

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / elasticsearch / elasticsearch 8.x v1

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / elasticsearch / elasticsearch 7.x v1

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / opensearch / opensearch 1.x v1

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / opensearch / opensearch 2.x v1

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / lint

undefined: writeAlias) (typecheck)

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / cassandra / cassandra-4.x v1 schema=manual

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / cassandra / cassandra-5.x v1 schema=manual

undefined: writeAlias

Check failure on line 237 in plugin/storage/es/factory.go

View workflow job for this annotation

GitHub Actions / unit-tests

undefined: writeAlias
}

func createSpanReader(
Expand Down Expand Up @@ -384,11 +380,18 @@ func (f *Factory) Close() error {
}

func (f *Factory) onPrimaryPasswordChange() {
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient, f.primaryMetricsFactory)
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient, f.metricsFactory)
}

func (f *Factory) onArchivePasswordChange() {
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient, f.archiveMetricsFactory)
archiveMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient, archiveMetricsFactory)
}

func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client], mf metrics.Factory) {
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
Name: "storage",
Tags: map[string]string{
"kind": kind,
"role": "primary", // can be overiden in the storage factory for archive/sampling stores
},
})
if err := factory.Initialize(mf, logger); err != nil {
Expand Down
9 changes: 1 addition & 8 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,7 @@ func (f *Factory) newRemoteStorage(

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
primaryMetricsFactory := f.telset.Metrics.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "primary",
},
},
)
return spanstoremetrics.NewReaderDecorator(f.services.Store.SpanReader(), primaryMetricsFactory), nil
return spanstoremetrics.NewReaderDecorator(f.services.Store.SpanReader(), f.telset.Metrics), nil
}

// CreateSpanWriter implements storage.Factory
Expand Down
9 changes: 1 addition & 8 deletions plugin/storage/memory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
primaryMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "primary",
},
},
)
return spanstoremetrics.NewReaderDecorator(f.store, primaryMetricsFactory), nil
return spanstoremetrics.NewReaderDecorator(f.store, f.metricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
Expand Down

0 comments on commit 07c5976

Please sign in to comment.