diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index d9ad44f885c..c117362176d 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -30,7 +30,6 @@ type connectorNode struct { exprPipelineType pipeline.Signal rcvrPipelineType pipeline.Signal component.Component - baseConsumer } func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode { @@ -43,7 +42,7 @@ func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID } func (n *connectorNode) getConsumer() baseConsumer { - return n.baseConsumer + return n.Component.(baseConsumer) } func (n *connectorNode) buildComponent( @@ -80,40 +79,28 @@ func (n *connectorNode) buildTraces( } next := connector.NewTracesRouter(consumers) - switch n.exprPipelineType { - case pipeline.SignalTraces: + if n.exprPipelineType == pipeline.SignalTraces { conn, err := builder.CreateTracesToTraces(ctx, set, next) if err != nil { return err } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalMetrics: - conn, err := builder.CreateMetricsToTraces(ctx, set, next) - if err != nil { - return err + n.Component = componentTraces{ + Component: conn, + Traces: capabilityconsumer.NewTraces(conn, aggregateCap(conn, nexts)), } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalLogs: - conn, err := builder.CreateLogsToTraces(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case componentprofiles.SignalProfiles: - conn, err := builder.CreateProfilesToTraces(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn + return nil } - if n.exprPipelineType == pipeline.SignalTraces { - n.baseConsumer = capabilityconsumer.NewTraces( - n.Component.(connector.Traces), - aggregateCapabilities(n.baseConsumer, nexts), - ) + var err error + switch n.exprPipelineType { + case pipeline.SignalMetrics: + n.Component, err = builder.CreateMetricsToTraces(ctx, set, next) + case pipeline.SignalLogs: + n.Component, err = builder.CreateLogsToTraces(ctx, set, next) + case componentprofiles.SignalProfiles: + n.Component, err = builder.CreateProfilesToTraces(ctx, set, next) } - return nil + return err } func (n *connectorNode) buildMetrics( @@ -128,40 +115,28 @@ func (n *connectorNode) buildMetrics( } next := connector.NewMetricsRouter(consumers) - switch n.exprPipelineType { - case pipeline.SignalTraces: - conn, err := builder.CreateTracesToMetrics(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalMetrics: + if n.exprPipelineType == pipeline.SignalMetrics { conn, err := builder.CreateMetricsToMetrics(ctx, set, next) if err != nil { return err } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalLogs: - conn, err := builder.CreateLogsToMetrics(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case componentprofiles.SignalProfiles: - conn, err := builder.CreateProfilesToMetrics(ctx, set, next) - if err != nil { - return err + n.Component = componentMetrics{ + Component: conn, + Metrics: capabilityconsumer.NewMetrics(conn, aggregateCap(conn, nexts)), } - n.Component, n.baseConsumer = conn, conn + return nil } - if n.exprPipelineType == pipeline.SignalMetrics { - n.baseConsumer = capabilityconsumer.NewMetrics( - n.Component.(connector.Metrics), - aggregateCapabilities(n.baseConsumer, nexts), - ) + var err error + switch n.exprPipelineType { + case pipeline.SignalTraces: + n.Component, err = builder.CreateTracesToMetrics(ctx, set, next) + case pipeline.SignalLogs: + n.Component, err = builder.CreateLogsToMetrics(ctx, set, next) + case componentprofiles.SignalProfiles: + n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next) } - return nil + return err } func (n *connectorNode) buildLogs( @@ -176,40 +151,28 @@ func (n *connectorNode) buildLogs( } next := connector.NewLogsRouter(consumers) - switch n.exprPipelineType { - case pipeline.SignalTraces: - conn, err := builder.CreateTracesToLogs(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalMetrics: - conn, err := builder.CreateMetricsToLogs(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalLogs: + if n.exprPipelineType == pipeline.SignalLogs { conn, err := builder.CreateLogsToLogs(ctx, set, next) if err != nil { return err } - n.Component, n.baseConsumer = conn, conn - case componentprofiles.SignalProfiles: - conn, err := builder.CreateProfilesToLogs(ctx, set, next) - if err != nil { - return err + n.Component = componentLogs{ + Component: conn, + Logs: capabilityconsumer.NewLogs(conn, aggregateCap(conn, nexts)), } - n.Component, n.baseConsumer = conn, conn + return nil } - if n.exprPipelineType == pipeline.SignalLogs { - n.baseConsumer = capabilityconsumer.NewLogs( - n.Component.(connector.Logs), - aggregateCapabilities(n.baseConsumer, nexts), - ) + var err error + switch n.exprPipelineType { + case pipeline.SignalTraces: + n.Component, err = builder.CreateTracesToLogs(ctx, set, next) + case pipeline.SignalMetrics: + n.Component, err = builder.CreateMetricsToLogs(ctx, set, next) + case componentprofiles.SignalProfiles: + n.Component, err = builder.CreateProfilesToLogs(ctx, set, next) } - return nil + return err } func (n *connectorNode) buildProfiles( @@ -224,47 +187,35 @@ func (n *connectorNode) buildProfiles( } next := connectorprofiles.NewProfilesRouter(consumers) - switch n.exprPipelineType { - case pipeline.SignalTraces: - conn, err := builder.CreateTracesToProfiles(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalMetrics: - conn, err := builder.CreateMetricsToProfiles(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalLogs: - conn, err := builder.CreateLogsToProfiles(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case componentprofiles.SignalProfiles: + if n.exprPipelineType == componentprofiles.SignalProfiles { conn, err := builder.CreateProfilesToProfiles(ctx, set, next) if err != nil { return err } - n.Component, n.baseConsumer = conn, conn + n.Component = componentProfiles{ + Component: conn, + Profiles: capabilityconsumer.NewProfiles(conn, aggregateCap(conn, nexts)), + } + return nil } - if n.exprPipelineType == componentprofiles.SignalProfiles { - n.baseConsumer = capabilityconsumer.NewProfiles( - n.Component.(connectorprofiles.Profiles), - aggregateCapabilities(n.baseConsumer, nexts), - ) + var err error + switch n.exprPipelineType { + case pipeline.SignalTraces: + n.Component, err = builder.CreateTracesToProfiles(ctx, set, next) + case pipeline.SignalMetrics: + n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next) + case pipeline.SignalLogs: + n.Component, err = builder.CreateLogsToProfiles(ctx, set, next) } - return nil + return err } // When connecting pipelines of the same data type, the connector must // inherit the capabilities of pipelines in which it is acting as a receiver. // Since the incoming and outgoing data types are the same, we must also consider -// that the connector itself may MutatesData. -func aggregateCapabilities(base baseConsumer, nexts []baseConsumer) consumer.Capabilities { +// that the connector itself may mutate the data and pass it along. +func aggregateCap(base baseConsumer, nexts []baseConsumer) consumer.Capabilities { capabilities := base.Capabilities() for _, next := range nexts { capabilities.MutatesData = capabilities.MutatesData || next.Capabilities().MutatesData diff --git a/service/internal/graph/consumer.go b/service/internal/graph/consumer.go index 2cdce4a7534..6bc6b96ca02 100644 --- a/service/internal/graph/consumer.go +++ b/service/internal/graph/consumer.go @@ -4,7 +4,9 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph" import ( + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" ) // baseConsumer redeclared here since not public in consumer package. May consider to make that public. @@ -15,3 +17,23 @@ type baseConsumer interface { type consumerNode interface { getConsumer() baseConsumer } + +type componentTraces struct { + component.Component + consumer.Traces +} + +type componentMetrics struct { + component.Component + consumer.Metrics +} + +type componentLogs struct { + component.Component + consumer.Logs +} + +type componentProfiles struct { + component.Component + consumerprofiles.Profiles +} diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 6c4dc61307a..809825c0d58 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" "go.opentelemetry.io/collector/connector/connectortest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" @@ -832,17 +831,9 @@ func TestConnectorPipelinesGraph(t *testing.T) { require.Empty(t, e.Logs) require.Empty(t, e.Profiles) case *connectorNode: - // connector needs to be unwrapped to access component as ExampleConnector - switch ct := c.Component.(type) { - case connector.Traces: - require.True(t, ct.(*testcomponents.ExampleConnector).Started()) - case connector.Metrics: - require.True(t, ct.(*testcomponents.ExampleConnector).Started()) - case connector.Logs: - require.True(t, ct.(*testcomponents.ExampleConnector).Started()) - case connectorprofiles.Profiles: - require.True(t, ct.(*testcomponents.ExampleConnector).Started()) - } + exConn := unwrapExampleConnector(c) + require.NotNil(t, exConn) + require.True(t, exConn.Started()) default: require.Fail(t, fmt.Sprintf("unexpected type %T", c)) } @@ -857,17 +848,9 @@ func TestConnectorPipelinesGraph(t *testing.T) { case *receiverNode: require.True(t, c.Component.(*testcomponents.ExampleReceiver).Started()) case *connectorNode: - // connector needs to be unwrapped to access component as ExampleConnector - switch ct := c.Component.(type) { - case connector.Traces: - require.True(t, ct.(*testcomponents.ExampleConnector).Started()) - case connector.Metrics: - require.True(t, ct.(*testcomponents.ExampleConnector).Started()) - case connector.Logs: - require.True(t, ct.(*testcomponents.ExampleConnector).Started()) - case connectorprofiles.Profiles: - require.True(t, ct.(*testcomponents.ExampleConnector).Started()) - } + exConn := unwrapExampleConnector(c) + require.NotNil(t, exConn) + require.True(t, exConn.Started()) default: require.Fail(t, fmt.Sprintf("unexpected type %T", c)) } @@ -937,17 +920,9 @@ func TestConnectorPipelinesGraph(t *testing.T) { case *receiverNode: require.True(t, c.Component.(*testcomponents.ExampleReceiver).Stopped()) case *connectorNode: - // connector needs to be unwrapped to access component as ExampleConnector - switch ct := c.Component.(type) { - case connector.Traces: - require.True(t, ct.(*testcomponents.ExampleConnector).Stopped()) - case connector.Metrics: - require.True(t, ct.(*testcomponents.ExampleConnector).Stopped()) - case connector.Logs: - require.True(t, ct.(*testcomponents.ExampleConnector).Stopped()) - case connectorprofiles.Profiles: - require.True(t, ct.(*testcomponents.ExampleConnector).Stopped()) - } + exConn := unwrapExampleConnector(c) + require.NotNil(t, exConn) + require.True(t, exConn.Stopped()) default: require.Fail(t, fmt.Sprintf("unexpected type %T", c)) } @@ -963,17 +938,9 @@ func TestConnectorPipelinesGraph(t *testing.T) { e := c.Component.(*testcomponents.ExampleExporter) require.True(t, e.Stopped()) case *connectorNode: - // connector needs to be unwrapped to access component as ExampleConnector - switch ct := c.Component.(type) { - case connector.Traces: - require.True(t, ct.(*testcomponents.ExampleConnector).Stopped()) - case connector.Metrics: - require.True(t, ct.(*testcomponents.ExampleConnector).Stopped()) - case connector.Logs: - require.True(t, ct.(*testcomponents.ExampleConnector).Stopped()) - case connectorprofiles.Profiles: - require.True(t, ct.(*testcomponents.ExampleConnector).Stopped()) - } + exConn := unwrapExampleConnector(c) + require.NotNil(t, exConn) + require.True(t, exConn.Stopped()) default: require.Fail(t, fmt.Sprintf("unexpected type %T", c)) } diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go index 43f1e64bb6c..0a9165622c4 100644 --- a/service/internal/graph/util_test.go +++ b/service/internal/graph/util_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/processor/processorprofiles" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverprofiles" + "go.opentelemetry.io/collector/service/internal/testcomponents" "go.opentelemetry.io/collector/service/pipelines" ) @@ -146,6 +147,29 @@ func expectedInstances(m pipelines.Config, pID pipeline.ID) (int, int) { return r, e } +// connector needs to be unwrapped to access component as ExampleConnector +func unwrapExampleConnector(c *connectorNode) *testcomponents.ExampleConnector { + switch ct := c.Component.(type) { + case componentTraces: // consumes traces, emits traces + return ct.Component.(*testcomponents.ExampleConnector) + case connector.Traces: // consumes traces, emits something else + return ct.(*testcomponents.ExampleConnector) + case componentMetrics: // consumes metrics, emits metrics + return ct.Component.(*testcomponents.ExampleConnector) + case connector.Metrics: // consumes metrics, emits something else + return ct.(*testcomponents.ExampleConnector) + case componentLogs: // consumes logs, emits logs + return ct.Component.(*testcomponents.ExampleConnector) + case connector.Logs: // consumes logs, emits something else + return ct.(*testcomponents.ExampleConnector) + case componentProfiles: // consumes profiles, emits profiles + return ct.Component.(*testcomponents.ExampleConnector) + case connectorprofiles.Profiles: // consumes profiles, emits something else + return ct.(*testcomponents.ExampleConnector) + } + return nil +} + func newBadReceiverFactory() receiver.Factory { return receiver.NewFactory(component.MustNewType("bf"), func() component.Config { return &struct{}{}