Skip to content

Commit

Permalink
[chore][graph] Remove connectorNode's separate baseConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 2, 2024
1 parent 4ace638 commit 1aac88d
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 154 deletions.
169 changes: 60 additions & 109 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type connectorNode struct {
exprPipelineType pipeline.Signal
rcvrPipelineType pipeline.Signal
component.Component
baseConsumer
}

func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode {
Expand All @@ -43,7 +42,7 @@ func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID
}

func (n *connectorNode) getConsumer() baseConsumer {
return n.baseConsumer
return n.Component.(baseConsumer)
}

func (n *connectorNode) buildComponent(
Expand Down Expand Up @@ -80,40 +79,28 @@ func (n *connectorNode) buildTraces(
}
next := connector.NewTracesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
if n.exprPipelineType == pipeline.SignalTraces {
conn, err := builder.CreateTracesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
if err != nil {
return err
n.Component = componentTraces{
Component: conn,
Traces: capabilityconsumer.NewTraces(conn, aggregateCap(conn, nexts)),
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
return nil
}

if n.exprPipelineType == pipeline.SignalTraces {
n.baseConsumer = capabilityconsumer.NewTraces(
n.Component.(connector.Traces),
aggregateCapabilities(n.baseConsumer, nexts),
)
var err error
switch n.exprPipelineType {
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogsToTraces(ctx, set, next)
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
}
return nil
return err
}

func (n *connectorNode) buildMetrics(
Expand All @@ -128,40 +115,28 @@ func (n *connectorNode) buildMetrics(
}
next := connector.NewMetricsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
if n.exprPipelineType == pipeline.SignalMetrics {
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
if err != nil {
return err
n.Component = componentMetrics{
Component: conn,
Metrics: capabilityconsumer.NewMetrics(conn, aggregateCap(conn, nexts)),
}
n.Component, n.baseConsumer = conn, conn
return nil
}

if n.exprPipelineType == pipeline.SignalMetrics {
n.baseConsumer = capabilityconsumer.NewMetrics(
n.Component.(connector.Metrics),
aggregateCapabilities(n.baseConsumer, nexts),
)
var err error
switch n.exprPipelineType {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogsToMetrics(ctx, set, next)
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
}
return nil
return err
}

func (n *connectorNode) buildLogs(
Expand All @@ -176,40 +151,28 @@ func (n *connectorNode) buildLogs(
}
next := connector.NewLogsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
if n.exprPipelineType == pipeline.SignalLogs {
conn, err := builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
if err != nil {
return err
n.Component = componentLogs{
Component: conn,
Logs: capabilityconsumer.NewLogs(conn, aggregateCap(conn, nexts)),
}
n.Component, n.baseConsumer = conn, conn
return nil
}

if n.exprPipelineType == pipeline.SignalLogs {
n.baseConsumer = capabilityconsumer.NewLogs(
n.Component.(connector.Logs),
aggregateCapabilities(n.baseConsumer, nexts),
)
var err error
switch n.exprPipelineType {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetricsToLogs(ctx, set, next)
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfilesToLogs(ctx, set, next)
}
return nil
return err
}

func (n *connectorNode) buildProfiles(
Expand All @@ -224,47 +187,35 @@ func (n *connectorNode) buildProfiles(
}
next := connectorprofiles.NewProfilesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
if n.exprPipelineType == componentprofiles.SignalProfiles {
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
n.Component = componentProfiles{
Component: conn,
Profiles: capabilityconsumer.NewProfiles(conn, aggregateCap(conn, nexts)),
}
return nil
}

if n.exprPipelineType == componentprofiles.SignalProfiles {
n.baseConsumer = capabilityconsumer.NewProfiles(
n.Component.(connectorprofiles.Profiles),
aggregateCapabilities(n.baseConsumer, nexts),
)
var err error
switch n.exprPipelineType {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTracesToProfiles(ctx, set, next)
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next)
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogsToProfiles(ctx, set, next)
}
return nil
return err
}

// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
func aggregateCapabilities(base baseConsumer, nexts []baseConsumer) consumer.Capabilities {
// that the connector itself may mutate the data and pass it along.
func aggregateCap(base baseConsumer, nexts []baseConsumer) consumer.Capabilities {
capabilities := base.Capabilities()
for _, next := range nexts {
capabilities.MutatesData = capabilities.MutatesData || next.Capabilities().MutatesData
Expand Down
22 changes: 22 additions & 0 deletions service/internal/graph/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
)

// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
Expand All @@ -15,3 +17,23 @@ type baseConsumer interface {
type consumerNode interface {
getConsumer() baseConsumer
}

type componentTraces struct {
component.Component
consumer.Traces
}

type componentMetrics struct {
component.Component
consumer.Metrics
}

type componentLogs struct {
component.Component
consumer.Logs
}

type componentProfiles struct {
component.Component
consumerprofiles.Profiles
}
57 changes: 12 additions & 45 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/connectorprofiles"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
Expand Down Expand Up @@ -832,17 +831,9 @@ func TestConnectorPipelinesGraph(t *testing.T) {
require.Empty(t, e.Logs)
require.Empty(t, e.Profiles)
case *connectorNode:
// connector needs to be unwrapped to access component as ExampleConnector
switch ct := c.Component.(type) {
case connector.Traces:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connector.Metrics:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connector.Logs:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connectorprofiles.Profiles:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
}
exConn := unwrapExampleConnector(c)
require.NotNil(t, exConn)
require.True(t, exConn.Started())
default:
require.Fail(t, fmt.Sprintf("unexpected type %T", c))
}
Expand All @@ -857,17 +848,9 @@ func TestConnectorPipelinesGraph(t *testing.T) {
case *receiverNode:
require.True(t, c.Component.(*testcomponents.ExampleReceiver).Started())
case *connectorNode:
// connector needs to be unwrapped to access component as ExampleConnector
switch ct := c.Component.(type) {
case connector.Traces:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connector.Metrics:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connector.Logs:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
case connectorprofiles.Profiles:
require.True(t, ct.(*testcomponents.ExampleConnector).Started())
}
exConn := unwrapExampleConnector(c)
require.NotNil(t, exConn)
require.True(t, exConn.Started())
default:
require.Fail(t, fmt.Sprintf("unexpected type %T", c))
}
Expand Down Expand Up @@ -937,17 +920,9 @@ func TestConnectorPipelinesGraph(t *testing.T) {
case *receiverNode:
require.True(t, c.Component.(*testcomponents.ExampleReceiver).Stopped())
case *connectorNode:
// connector needs to be unwrapped to access component as ExampleConnector
switch ct := c.Component.(type) {
case connector.Traces:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connector.Metrics:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connector.Logs:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connectorprofiles.Profiles:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
}
exConn := unwrapExampleConnector(c)
require.NotNil(t, exConn)
require.True(t, exConn.Stopped())
default:
require.Fail(t, fmt.Sprintf("unexpected type %T", c))
}
Expand All @@ -963,17 +938,9 @@ func TestConnectorPipelinesGraph(t *testing.T) {
e := c.Component.(*testcomponents.ExampleExporter)
require.True(t, e.Stopped())
case *connectorNode:
// connector needs to be unwrapped to access component as ExampleConnector
switch ct := c.Component.(type) {
case connector.Traces:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connector.Metrics:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connector.Logs:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
case connectorprofiles.Profiles:
require.True(t, ct.(*testcomponents.ExampleConnector).Stopped())
}
exConn := unwrapExampleConnector(c)
require.NotNil(t, exConn)
require.True(t, exConn.Stopped())
default:
require.Fail(t, fmt.Sprintf("unexpected type %T", c))
}
Expand Down
Loading

0 comments on commit 1aac88d

Please sign in to comment.