From 8a385c22e5f7e05d790cd992baafcb31ea42db9a Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Mon, 16 Oct 2023 11:12:50 -0700 Subject: [PATCH] [pdata] Enable the pdata mutation safeguards in the fanout consumers (#8634) This change enables the runtime assertions to catch unintentional pdata mutations in components claiming as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by unrelated components, making it very difficult to troubleshoot. This required introducing extra API to get the pdata mutability state: - p[metric|trace|log].[Metrics|Traces|Logs].IsReadOnly() Resolves: https://github.com/open-telemetry/opentelemetry-collector/issues/6794 --- .chloggen/add-is-read-only.yaml | 18 ++++++ .chloggen/enable-mutation-assertions.yaml | 26 +++++++++ internal/fanoutconsumer/logs.go | 68 ++++++++++++----------- internal/fanoutconsumer/logs_test.go | 55 ++++++++++++++++-- internal/fanoutconsumer/metrics.go | 68 ++++++++++++----------- internal/fanoutconsumer/metrics_test.go | 55 ++++++++++++++++-- internal/fanoutconsumer/traces.go | 68 ++++++++++++----------- internal/fanoutconsumer/traces_test.go | 56 +++++++++++++++++-- pdata/plog/logs.go | 9 +++ pdata/plog/logs_test.go | 2 + pdata/pmetric/metrics.go | 9 +++ pdata/pmetric/metrics_test.go | 2 + pdata/ptrace/traces.go | 9 +++ pdata/ptrace/traces_test.go | 2 + service/internal/graph/graph_test.go | 18 +++++- 15 files changed, 351 insertions(+), 114 deletions(-) create mode 100644 .chloggen/add-is-read-only.yaml create mode 100644 .chloggen/enable-mutation-assertions.yaml diff --git a/.chloggen/add-is-read-only.yaml b/.chloggen/add-is-read-only.yaml new file mode 100644 index 00000000000..c4af586b265 --- /dev/null +++ b/.chloggen/add-is-read-only.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pdata + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add IsReadOnly() method to p[metrics|logs|traces].[Metrics|Logs|Spans] pdata structs allowing to check if the struct is read-only. + +# One or more tracking issues or pull requests related to the change +issues: [6794] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/enable-mutation-assertions.yaml b/.chloggen/enable-mutation-assertions.yaml new file mode 100644 index 00000000000..eec8f892160 --- /dev/null +++ b/.chloggen/enable-mutation-assertions.yaml @@ -0,0 +1,26 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: fanoutconsumer + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enable runtime assertions to catch incorrect pdata mutations in the components claiming as non-mutating pdata. + +# One or more tracking issues or pull requests related to the change +issues: [6794] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This change enables the runtime assertions to catch unintentional pdata mutations in components that are claimed + as non-mutating pdata. Without these assertions, runtime errors may still occur, but thrown by unrelated components, + making it very difficult to troubleshoot. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/fanoutconsumer/logs.go b/internal/fanoutconsumer/logs.go index 9b07bbc6377..e0f9f88df4c 100644 --- a/internal/fanoutconsumer/logs.go +++ b/internal/fanoutconsumer/logs.go @@ -20,36 +20,22 @@ import ( // NewLogs wraps multiple log consumers in a single one. // It fanouts the incoming data to all the consumers, and does smart routing: // - Clones only to the consumer that needs to mutate the data. -// - If all consumers needs to mutate the data one will get the original data. +// - If all consumers needs to mutate the data one will get the original mutable data. func NewLogs(lcs []consumer.Logs) consumer.Logs { - if len(lcs) == 1 { - // Don't wrap if no need to do it. - return lcs[0] - } - var pass []consumer.Logs - var clone []consumer.Logs - for i := 0; i < len(lcs)-1; i++ { - if !lcs[i].Capabilities().MutatesData { - pass = append(pass, lcs[i]) + lc := &logsConsumer{} + for i := 0; i < len(lcs); i++ { + if lcs[i].Capabilities().MutatesData { + lc.mutable = append(lc.mutable, lcs[i]) } else { - clone = append(clone, lcs[i]) + lc.readonly = append(lc.readonly, lcs[i]) } } - // Give the original data to the last consumer if no other read-only consumer, - // otherwise put it in the right bucket. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - if len(pass) == 0 || !lcs[len(lcs)-1].Capabilities().MutatesData { - pass = append(pass, lcs[len(lcs)-1]) - } else { - clone = append(clone, lcs[len(lcs)-1]) - } - return &logsConsumer{pass: pass, clone: clone} + return lc } type logsConsumer struct { - pass []consumer.Logs - clone []consumer.Logs + mutable []consumer.Logs + readonly []consumer.Logs } func (lsc *logsConsumer) Capabilities() consumer.Capabilities { @@ -59,20 +45,40 @@ func (lsc *logsConsumer) Capabilities() consumer.Capabilities { // ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one. func (lsc *logsConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { var errs error - // Initially pass to clone exporter to avoid the case where the optimization of sending - // the incoming data to a mutating consumer is used that may change the incoming data before - // cloning. - for _, lc := range lsc.clone { - clonedLogs := plog.NewLogs() - ld.CopyTo(clonedLogs) - errs = multierr.Append(errs, lc.ConsumeLogs(ctx, clonedLogs)) + + if len(lsc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(lsc.mutable)-1; i++ { + errs = multierr.Append(errs, lsc.mutable[i].ConsumeLogs(ctx, cloneLogs(ld))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := lsc.mutable[len(lsc.mutable)-1] + if len(lsc.readonly) == 0 && !ld.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, ld)) + } else { + errs = multierr.Append(errs, lastConsumer.ConsumeLogs(ctx, cloneLogs(ld))) + } } - for _, lc := range lsc.pass { + + // Mark the data as read-only if it will be sent to more than one read-only consumer. + if len(lsc.readonly) > 1 && !ld.IsReadOnly() { + ld.MarkReadOnly() + } + for _, lc := range lsc.readonly { errs = multierr.Append(errs, lc.ConsumeLogs(ctx, ld)) } + return errs } +func cloneLogs(ld plog.Logs) plog.Logs { + clonedLogs := plog.NewLogs() + ld.CopyTo(clonedLogs) + return clonedLogs +} + var _ connector.LogsRouter = (*logsRouter)(nil) type logsRouter struct { diff --git a/internal/fanoutconsumer/logs_test.go b/internal/fanoutconsumer/logs_test.go index 1db046b854d..5ae01b6b423 100644 --- a/internal/fanoutconsumer/logs_test.go +++ b/internal/fanoutconsumer/logs_test.go @@ -20,12 +20,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" ) -func TestLogsNotMultiplexing(t *testing.T) { - nop := consumertest.NewNop() - lfc := NewLogs([]consumer.Logs{nop}) - assert.Same(t, nop, lfc) -} - func TestLogsMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.LogsSink) p2 := new(consumertest.LogsSink) @@ -57,6 +51,9 @@ func TestLogsMultiplexingNonMutating(t *testing.T) { assert.True(t, ld == p3.AllLogs()[1]) assert.EqualValues(t, ld, p3.AllLogs()[0]) assert.EqualValues(t, ld, p3.AllLogs()[1]) + + // The data should be marked as read only. + assert.True(t, ld.IsReadOnly()) } func TestLogsMultiplexingMutating(t *testing.T) { @@ -91,6 +88,46 @@ func TestLogsMultiplexingMutating(t *testing.T) { assert.True(t, ld == p3.AllLogs()[1]) assert.EqualValues(t, ld, p3.AllLogs()[0]) assert.EqualValues(t, ld, p3.AllLogs()[1]) + + // The data should not be marked as read only. + assert.False(t, ld.IsReadOnly()) +} + +func TestReadOnlyLogsMultiplexingMutating(t *testing.T) { + p1 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} + p2 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} + p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} + + lfc := NewLogs([]consumer.Logs{p1, p2, p3}) + assert.False(t, lfc.Capabilities().MutatesData) + ldOrig := testdata.GenerateLogs(1) + ld := testdata.GenerateLogs(1) + ld.MarkReadOnly() + + for i := 0; i < 2; i++ { + err := lfc.ConsumeLogs(context.Background(), ld) + if err != nil { + t.Errorf("Wanted nil got error") + return + } + } + + // All consumers should receive the cloned data. + + assert.True(t, ld != p1.AllLogs()[0]) + assert.True(t, ld != p1.AllLogs()[1]) + assert.EqualValues(t, ldOrig, p1.AllLogs()[0]) + assert.EqualValues(t, ldOrig, p1.AllLogs()[1]) + + assert.True(t, ld != p2.AllLogs()[0]) + assert.True(t, ld != p2.AllLogs()[1]) + assert.EqualValues(t, ldOrig, p2.AllLogs()[0]) + assert.EqualValues(t, ldOrig, p2.AllLogs()[1]) + + assert.True(t, ld != p3.AllLogs()[0]) + assert.True(t, ld != p3.AllLogs()[1]) + assert.EqualValues(t, ldOrig, p3.AllLogs()[0]) + assert.EqualValues(t, ldOrig, p3.AllLogs()[1]) } func TestLogsMultiplexingMixLastMutating(t *testing.T) { @@ -126,6 +163,9 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) { assert.True(t, ld != p3.AllLogs()[1]) assert.EqualValues(t, ld, p3.AllLogs()[0]) assert.EqualValues(t, ld, p3.AllLogs()[1]) + + // The data should not be marked as read only. + assert.False(t, ld.IsReadOnly()) } func TestLogsMultiplexingMixLastNonMutating(t *testing.T) { @@ -160,6 +200,9 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) { assert.True(t, ld == p3.AllLogs()[1]) assert.EqualValues(t, ld, p3.AllLogs()[0]) assert.EqualValues(t, ld, p3.AllLogs()[1]) + + // The data should not be marked as read only. + assert.False(t, ld.IsReadOnly()) } func TestLogsWhenErrors(t *testing.T) { diff --git a/internal/fanoutconsumer/metrics.go b/internal/fanoutconsumer/metrics.go index f1c1280e4c3..13ea0efe0cf 100644 --- a/internal/fanoutconsumer/metrics.go +++ b/internal/fanoutconsumer/metrics.go @@ -18,36 +18,22 @@ import ( // NewMetrics wraps multiple metrics consumers in a single one. // It fanouts the incoming data to all the consumers, and does smart routing: // - Clones only to the consumer that needs to mutate the data. -// - If all consumers needs to mutate the data one will get the original data. +// - If all consumers needs to mutate the data one will get the original mutable data. func NewMetrics(mcs []consumer.Metrics) consumer.Metrics { - if len(mcs) == 1 { - // Don't wrap if no need to do it. - return mcs[0] - } - var pass []consumer.Metrics - var clone []consumer.Metrics - for i := 0; i < len(mcs)-1; i++ { - if !mcs[i].Capabilities().MutatesData { - pass = append(pass, mcs[i]) + mc := &metricsConsumer{} + for i := 0; i < len(mcs); i++ { + if mcs[i].Capabilities().MutatesData { + mc.mutable = append(mc.mutable, mcs[i]) } else { - clone = append(clone, mcs[i]) + mc.readonly = append(mc.readonly, mcs[i]) } } - // Give the original data to the last consumer if no other read-only consumer, - // otherwise put it in the right bucket. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - if len(pass) == 0 || !mcs[len(mcs)-1].Capabilities().MutatesData { - pass = append(pass, mcs[len(mcs)-1]) - } else { - clone = append(clone, mcs[len(mcs)-1]) - } - return &metricsConsumer{pass: pass, clone: clone} + return mc } type metricsConsumer struct { - pass []consumer.Metrics - clone []consumer.Metrics + mutable []consumer.Metrics + readonly []consumer.Metrics } func (msc *metricsConsumer) Capabilities() consumer.Capabilities { @@ -57,20 +43,40 @@ func (msc *metricsConsumer) Capabilities() consumer.Capabilities { // ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one. func (msc *metricsConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { var errs error - // Initially pass to clone exporter to avoid the case where the optimization of sending - // the incoming data to a mutating consumer is used that may change the incoming data before - // cloning. - for _, mc := range msc.clone { - clonedMetrics := pmetric.NewMetrics() - md.CopyTo(clonedMetrics) - errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, clonedMetrics)) + + if len(msc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(msc.mutable)-1; i++ { + errs = multierr.Append(errs, msc.mutable[i].ConsumeMetrics(ctx, cloneMetrics(md))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := msc.mutable[len(msc.mutable)-1] + if len(msc.readonly) == 0 && !md.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, md)) + } else { + errs = multierr.Append(errs, lastConsumer.ConsumeMetrics(ctx, cloneMetrics(md))) + } } - for _, mc := range msc.pass { + + // Mark the data as read-only if it will be sent to more than one read-only consumer. + if len(msc.readonly) > 1 && !md.IsReadOnly() { + md.MarkReadOnly() + } + for _, mc := range msc.readonly { errs = multierr.Append(errs, mc.ConsumeMetrics(ctx, md)) } + return errs } +func cloneMetrics(md pmetric.Metrics) pmetric.Metrics { + clonedMetrics := pmetric.NewMetrics() + md.CopyTo(clonedMetrics) + return clonedMetrics +} + var _ connector.MetricsRouter = (*metricsRouter)(nil) type metricsRouter struct { diff --git a/internal/fanoutconsumer/metrics_test.go b/internal/fanoutconsumer/metrics_test.go index 8cdfeeb51fa..bbf86990944 100644 --- a/internal/fanoutconsumer/metrics_test.go +++ b/internal/fanoutconsumer/metrics_test.go @@ -20,12 +20,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) -func TestMetricsNotMultiplexing(t *testing.T) { - nop := consumertest.NewNop() - mfc := NewMetrics([]consumer.Metrics{nop}) - assert.Same(t, nop, mfc) -} - func TestMetricsMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.MetricsSink) p2 := new(consumertest.MetricsSink) @@ -57,6 +51,9 @@ func TestMetricsMultiplexingNonMutating(t *testing.T) { assert.True(t, md == p3.AllMetrics()[1]) assert.EqualValues(t, md, p3.AllMetrics()[0]) assert.EqualValues(t, md, p3.AllMetrics()[1]) + + // The data should be marked as read only. + assert.True(t, md.IsReadOnly()) } func TestMetricsMultiplexingMutating(t *testing.T) { @@ -91,6 +88,46 @@ func TestMetricsMultiplexingMutating(t *testing.T) { assert.True(t, md == p3.AllMetrics()[1]) assert.EqualValues(t, md, p3.AllMetrics()[0]) assert.EqualValues(t, md, p3.AllMetrics()[1]) + + // The data should not be marked as read only. + assert.False(t, md.IsReadOnly()) +} + +func TestReadOnlyMetricsMultiplexingMixFirstMutating(t *testing.T) { + p1 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} + p2 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} + p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} + + mfc := NewMetrics([]consumer.Metrics{p1, p2, p3}) + assert.False(t, mfc.Capabilities().MutatesData) + mdOrig := testdata.GenerateMetrics(1) + md := testdata.GenerateMetrics(1) + md.MarkReadOnly() + + for i := 0; i < 2; i++ { + err := mfc.ConsumeMetrics(context.Background(), md) + if err != nil { + t.Errorf("Wanted nil got error") + return + } + } + + // All consumers should receive the cloned data. + + assert.True(t, md != p1.AllMetrics()[0]) + assert.True(t, md != p1.AllMetrics()[1]) + assert.EqualValues(t, mdOrig, p1.AllMetrics()[0]) + assert.EqualValues(t, mdOrig, p1.AllMetrics()[1]) + + assert.True(t, md != p2.AllMetrics()[0]) + assert.True(t, md != p2.AllMetrics()[1]) + assert.EqualValues(t, mdOrig, p2.AllMetrics()[0]) + assert.EqualValues(t, mdOrig, p2.AllMetrics()[1]) + + assert.True(t, md != p3.AllMetrics()[0]) + assert.True(t, md != p3.AllMetrics()[1]) + assert.EqualValues(t, mdOrig, p3.AllMetrics()[0]) + assert.EqualValues(t, mdOrig, p3.AllMetrics()[1]) } func TestMetricsMultiplexingMixLastMutating(t *testing.T) { @@ -126,6 +163,9 @@ func TestMetricsMultiplexingMixLastMutating(t *testing.T) { assert.True(t, md != p3.AllMetrics()[1]) assert.EqualValues(t, md, p3.AllMetrics()[0]) assert.EqualValues(t, md, p3.AllMetrics()[1]) + + // The data should not be marked as read only. + assert.False(t, md.IsReadOnly()) } func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) { @@ -160,6 +200,9 @@ func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) { assert.True(t, md == p3.AllMetrics()[1]) assert.EqualValues(t, md, p3.AllMetrics()[0]) assert.EqualValues(t, md, p3.AllMetrics()[1]) + + // The data should not be marked as read only. + assert.False(t, md.IsReadOnly()) } func TestMetricsWhenErrors(t *testing.T) { diff --git a/internal/fanoutconsumer/traces.go b/internal/fanoutconsumer/traces.go index 89ecb166ec5..bb6c30ae84e 100644 --- a/internal/fanoutconsumer/traces.go +++ b/internal/fanoutconsumer/traces.go @@ -18,36 +18,22 @@ import ( // NewTraces wraps multiple trace consumers in a single one. // It fanouts the incoming data to all the consumers, and does smart routing: // - Clones only to the consumer that needs to mutate the data. -// - If all consumers needs to mutate the data one will get the original data. +// - If all consumers needs to mutate the data one will get the original mutable data. func NewTraces(tcs []consumer.Traces) consumer.Traces { - if len(tcs) == 1 { - // Don't wrap if no need to do it. - return tcs[0] - } - var pass []consumer.Traces - var clone []consumer.Traces - for i := 0; i < len(tcs)-1; i++ { - if !tcs[i].Capabilities().MutatesData { - pass = append(pass, tcs[i]) + tc := &tracesConsumer{} + for i := 0; i < len(tcs); i++ { + if tcs[i].Capabilities().MutatesData { + tc.mutable = append(tc.mutable, tcs[i]) } else { - clone = append(clone, tcs[i]) + tc.readonly = append(tc.readonly, tcs[i]) } } - // Give the original data to the last consumer if no other read-only consumer, - // otherwise put it in the right bucket. Never share the same data between - // a mutating and a non-mutating consumer since the non-mutating consumer may process - // data async and the mutating consumer may change the data before that. - if len(pass) == 0 || !tcs[len(tcs)-1].Capabilities().MutatesData { - pass = append(pass, tcs[len(tcs)-1]) - } else { - clone = append(clone, tcs[len(tcs)-1]) - } - return &tracesConsumer{pass: pass, clone: clone} + return tc } type tracesConsumer struct { - pass []consumer.Traces - clone []consumer.Traces + mutable []consumer.Traces + readonly []consumer.Traces } func (tsc *tracesConsumer) Capabilities() consumer.Capabilities { @@ -57,20 +43,40 @@ func (tsc *tracesConsumer) Capabilities() consumer.Capabilities { // ConsumeTraces exports the ptrace.Traces to all consumers wrapped by the current one. func (tsc *tracesConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { var errs error - // Initially pass to clone exporter to avoid the case where the optimization of sending - // the incoming data to a mutating consumer is used that may change the incoming data before - // cloning. - for _, tc := range tsc.clone { - clonedTraces := ptrace.NewTraces() - td.CopyTo(clonedTraces) - errs = multierr.Append(errs, tc.ConsumeTraces(ctx, clonedTraces)) + + if len(tsc.mutable) > 0 { + // Clone the data before sending to all mutating consumers except the last one. + for i := 0; i < len(tsc.mutable)-1; i++ { + errs = multierr.Append(errs, tsc.mutable[i].ConsumeTraces(ctx, cloneTraces(td))) + } + // Send data as is to the last mutating consumer only if there are no other non-mutating consumers and the + // data is mutable. Never share the same data between a mutating and a non-mutating consumer since the + // non-mutating consumer may process data async and the mutating consumer may change the data before that. + lastConsumer := tsc.mutable[len(tsc.mutable)-1] + if len(tsc.readonly) == 0 && !td.IsReadOnly() { + errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, td)) + } else { + errs = multierr.Append(errs, lastConsumer.ConsumeTraces(ctx, cloneTraces(td))) + } } - for _, tc := range tsc.pass { + + // Mark the data as read-only if it will be sent to more than one read-only consumer. + if len(tsc.readonly) > 1 && !td.IsReadOnly() { + td.MarkReadOnly() + } + for _, tc := range tsc.readonly { errs = multierr.Append(errs, tc.ConsumeTraces(ctx, td)) } + return errs } +func cloneTraces(td ptrace.Traces) ptrace.Traces { + clonedTraces := ptrace.NewTraces() + td.CopyTo(clonedTraces) + return clonedTraces +} + var _ connector.TracesRouter = (*tracesRouter)(nil) type tracesRouter struct { diff --git a/internal/fanoutconsumer/traces_test.go b/internal/fanoutconsumer/traces_test.go index d2147b36167..ceda83ecebb 100644 --- a/internal/fanoutconsumer/traces_test.go +++ b/internal/fanoutconsumer/traces_test.go @@ -20,12 +20,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -func TestTracesNotMultiplexing(t *testing.T) { - nop := consumertest.NewNop() - tfc := NewTraces([]consumer.Traces{nop}) - assert.Same(t, nop, tfc) -} - func TestTracesMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.TracesSink) p2 := new(consumertest.TracesSink) @@ -57,6 +51,9 @@ func TestTracesMultiplexingNonMutating(t *testing.T) { assert.True(t, td == p3.AllTraces()[1]) assert.EqualValues(t, td, p3.AllTraces()[0]) assert.EqualValues(t, td, p3.AllTraces()[1]) + + // The data should be marked as read only. + assert.True(t, td.IsReadOnly()) } func TestTracesMultiplexingMutating(t *testing.T) { @@ -91,6 +88,47 @@ func TestTracesMultiplexingMutating(t *testing.T) { assert.True(t, td == p3.AllTraces()[1]) assert.EqualValues(t, td, p3.AllTraces()[0]) assert.EqualValues(t, td, p3.AllTraces()[1]) + + // The data should not be marked as read only. + assert.False(t, td.IsReadOnly()) +} + +func TestReadOnlyTracesMultiplexingMutating(t *testing.T) { + p1 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} + p2 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} + p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} + + tfc := NewTraces([]consumer.Traces{p1, p2, p3}) + assert.False(t, tfc.Capabilities().MutatesData) + + tdOrig := testdata.GenerateTraces(1) + td := testdata.GenerateTraces(1) + td.MarkReadOnly() + + for i := 0; i < 2; i++ { + err := tfc.ConsumeTraces(context.Background(), td) + if err != nil { + t.Errorf("Wanted nil got error") + return + } + } + + // All consumers should receive the cloned data. + + assert.True(t, td != p1.AllTraces()[0]) + assert.True(t, td != p1.AllTraces()[1]) + assert.EqualValues(t, tdOrig, p1.AllTraces()[0]) + assert.EqualValues(t, tdOrig, p1.AllTraces()[1]) + + assert.True(t, td != p2.AllTraces()[0]) + assert.True(t, td != p2.AllTraces()[1]) + assert.EqualValues(t, tdOrig, p2.AllTraces()[0]) + assert.EqualValues(t, tdOrig, p2.AllTraces()[1]) + + assert.True(t, td != p3.AllTraces()[0]) + assert.True(t, td != p3.AllTraces()[1]) + assert.EqualValues(t, tdOrig, p3.AllTraces()[0]) + assert.EqualValues(t, tdOrig, p3.AllTraces()[1]) } func TestTracesMultiplexingMixLastMutating(t *testing.T) { @@ -126,6 +164,9 @@ func TestTracesMultiplexingMixLastMutating(t *testing.T) { assert.True(t, td != p3.AllTraces()[1]) assert.EqualValues(t, td, p3.AllTraces()[0]) assert.EqualValues(t, td, p3.AllTraces()[1]) + + // The data should not be marked as read only. + assert.False(t, td.IsReadOnly()) } func TestTracesMultiplexingMixLastNonMutating(t *testing.T) { @@ -160,6 +201,9 @@ func TestTracesMultiplexingMixLastNonMutating(t *testing.T) { assert.True(t, td == p3.AllTraces()[1]) assert.EqualValues(t, td, p3.AllTraces()[0]) assert.EqualValues(t, td, p3.AllTraces()[1]) + + // The data should not be marked as read only. + assert.False(t, td.IsReadOnly()) } func TestTracesWhenErrors(t *testing.T) { diff --git a/pdata/plog/logs.go b/pdata/plog/logs.go index a6187fbc0f6..490526090f8 100644 --- a/pdata/plog/logs.go +++ b/pdata/plog/logs.go @@ -21,11 +21,20 @@ func (ms Logs) getOrig() *otlpcollectorlog.ExportLogsServiceRequest { return internal.GetOrigLogs(internal.Logs(ms)) } +func (ms Logs) getState() *internal.State { + return internal.GetLogsState(internal.Logs(ms)) +} + // NewLogs creates a new Logs struct. func NewLogs() Logs { return newLogs(&otlpcollectorlog.ExportLogsServiceRequest{}) } +// IsReadOnly returns true if this Logs instance is read-only. +func (ms Logs) IsReadOnly() bool { + return *ms.getState() == internal.StateReadOnly +} + // CopyTo copies the Logs instance overriding the destination. func (ms Logs) CopyTo(dest Logs) { ms.ResourceLogs().CopyTo(dest.ResourceLogs()) diff --git a/pdata/plog/logs_test.go b/pdata/plog/logs_test.go index 6eeefcf509c..9d3feef4d98 100644 --- a/pdata/plog/logs_test.go +++ b/pdata/plog/logs_test.go @@ -117,9 +117,11 @@ func TestLogsCopyTo(t *testing.T) { func TestReadOnlyLogsInvalidUsage(t *testing.T) { logs := NewLogs() + assert.False(t, logs.IsReadOnly()) res := logs.ResourceLogs().AppendEmpty().Resource() res.Attributes().PutStr("k1", "v1") logs.MarkReadOnly() + assert.True(t, logs.IsReadOnly()) assert.Panics(t, func() { res.Attributes().PutStr("k2", "v2") }) } diff --git a/pdata/pmetric/metrics.go b/pdata/pmetric/metrics.go index 5a8c0f29974..91195ca4dfa 100644 --- a/pdata/pmetric/metrics.go +++ b/pdata/pmetric/metrics.go @@ -21,11 +21,20 @@ func (ms Metrics) getOrig() *otlpcollectormetrics.ExportMetricsServiceRequest { return internal.GetOrigMetrics(internal.Metrics(ms)) } +func (ms Metrics) getState() *internal.State { + return internal.GetMetricsState(internal.Metrics(ms)) +} + // NewMetrics creates a new Metrics struct. func NewMetrics() Metrics { return newMetrics(&otlpcollectormetrics.ExportMetricsServiceRequest{}) } +// IsReadOnly returns true if this Metrics instance is read-only. +func (ms Metrics) IsReadOnly() bool { + return *ms.getState() == internal.StateReadOnly +} + // CopyTo copies the Metrics instance overriding the destination. func (ms Metrics) CopyTo(dest Metrics) { ms.ResourceMetrics().CopyTo(dest.ResourceMetrics()) diff --git a/pdata/pmetric/metrics_test.go b/pdata/pmetric/metrics_test.go index 9b2b5c2b70d..3c059643a84 100644 --- a/pdata/pmetric/metrics_test.go +++ b/pdata/pmetric/metrics_test.go @@ -638,9 +638,11 @@ func TestMetricsCopyTo(t *testing.T) { func TestReadOnlyMetricsInvalidUsage(t *testing.T) { metrics := NewMetrics() + assert.False(t, metrics.IsReadOnly()) res := metrics.ResourceMetrics().AppendEmpty().Resource() res.Attributes().PutStr("k1", "v1") metrics.MarkReadOnly() + assert.True(t, metrics.IsReadOnly()) assert.Panics(t, func() { res.Attributes().PutStr("k2", "v2") }) } diff --git a/pdata/ptrace/traces.go b/pdata/ptrace/traces.go index 0d8294098a6..a4b71e17853 100644 --- a/pdata/ptrace/traces.go +++ b/pdata/ptrace/traces.go @@ -21,11 +21,20 @@ func (ms Traces) getOrig() *otlpcollectortrace.ExportTraceServiceRequest { return internal.GetOrigTraces(internal.Traces(ms)) } +func (ms Traces) getState() *internal.State { + return internal.GetTracesState(internal.Traces(ms)) +} + // NewTraces creates a new Traces struct. func NewTraces() Traces { return newTraces(&otlpcollectortrace.ExportTraceServiceRequest{}) } +// IsReadOnly returns true if this Traces instance is read-only. +func (ms Traces) IsReadOnly() bool { + return *ms.getState() == internal.StateReadOnly +} + // CopyTo copies the Traces instance overriding the destination. func (ms Traces) CopyTo(dest Traces) { ms.ResourceSpans().CopyTo(dest.ResourceSpans()) diff --git a/pdata/ptrace/traces_test.go b/pdata/ptrace/traces_test.go index a09e4844d15..8884349485f 100644 --- a/pdata/ptrace/traces_test.go +++ b/pdata/ptrace/traces_test.go @@ -118,9 +118,11 @@ func TestTracesCopyTo(t *testing.T) { func TestReadOnlyTracesInvalidUsage(t *testing.T) { traces := NewTraces() + assert.False(t, traces.IsReadOnly()) res := traces.ResourceSpans().AppendEmpty().Resource() res.Attributes().PutStr("k1", "v1") traces.MarkReadOnly() + assert.True(t, traces.IsReadOnly()) assert.Panics(t, func() { res.Attributes().PutStr("k2", "v2") }) } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 4e6bc0256eb..168d3832558 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -861,22 +861,34 @@ func TestConnectorPipelinesGraph(t *testing.T) { for _, e := range allExporters[component.DataTypeTraces] { tracesExporter := e.(*testcomponents.ExampleExporter) assert.Equal(t, test.expectedPerExporter, len(tracesExporter.Traces)) + expected := testdata.GenerateTraces(1) + if len(allExporters[component.DataTypeTraces]) > 1 { + expected.MarkReadOnly() // multiple read-only exporters should get read-only pdata + } for i := 0; i < test.expectedPerExporter; i++ { - assert.EqualValues(t, testdata.GenerateTraces(1), tracesExporter.Traces[0]) + assert.EqualValues(t, expected, tracesExporter.Traces[0]) } } for _, e := range allExporters[component.DataTypeMetrics] { metricsExporter := e.(*testcomponents.ExampleExporter) assert.Equal(t, test.expectedPerExporter, len(metricsExporter.Metrics)) + expected := testdata.GenerateMetrics(1) + if len(allExporters[component.DataTypeMetrics]) > 1 { + expected.MarkReadOnly() // multiple read-only exporters should get read-only pdata + } for i := 0; i < test.expectedPerExporter; i++ { - assert.EqualValues(t, testdata.GenerateMetrics(1), metricsExporter.Metrics[0]) + assert.EqualValues(t, expected, metricsExporter.Metrics[0]) } } for _, e := range allExporters[component.DataTypeLogs] { logsExporter := e.(*testcomponents.ExampleExporter) assert.Equal(t, test.expectedPerExporter, len(logsExporter.Logs)) + expected := testdata.GenerateLogs(1) + if len(allExporters[component.DataTypeLogs]) > 1 { + expected.MarkReadOnly() // multiple read-only exporters should get read-only pdata + } for i := 0; i < test.expectedPerExporter; i++ { - assert.EqualValues(t, testdata.GenerateLogs(1), logsExporter.Logs[0]) + assert.EqualValues(t, expected, logsExporter.Logs[0]) } } })