Skip to content

Commit

Permalink
[chore][graph] Decompose buildConnector
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 2, 2024
1 parent a24f914 commit 012c912
Showing 1 changed file with 203 additions and 157 deletions.
360 changes: 203 additions & 157 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,173 +55,219 @@ func (n *connectorNode) buildComponent(
) error {
tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType)
set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}

switch n.rcvrPipelineType {
case pipeline.SignalTraces:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewTracesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewTraces(conn, capability)
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
return n.buildTraces(ctx, set, builder, nexts)
case pipeline.SignalMetrics:
return n.buildMetrics(ctx, set, builder, nexts)
case pipeline.SignalLogs:
return n.buildLogs(ctx, set, builder, nexts)
case componentprofiles.SignalProfiles:
return n.buildProfiles(ctx, set, builder, nexts)
}
return nil

Check warning on line 68 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L68

Added line #L68 was not covered by tests
}

func (n *connectorNode) buildTraces(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces)
}
next := connector.NewTracesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToTraces(ctx, set, next)
if err != nil {
return err

Check warning on line 87 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L87

Added line #L87 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
if err != nil {
return err

Check warning on line 93 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L93

Added line #L93 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err

Check warning on line 99 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L99

Added line #L99 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
if err != nil {
return err

Check warning on line 105 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L105

Added line #L105 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalTraces {
n.baseConsumer = capabilityconsumer.NewTraces(
n.Component.(connector.Traces),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildMetrics(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics)
}
next := connector.NewMetricsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
if err != nil {
return err

Check warning on line 135 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L135

Added line #L135 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewMetricsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewMetrics(conn, capability)
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err

Check warning on line 141 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L141

Added line #L141 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewLogsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewLogs(conn, capability)
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
if err != nil {
return err

Check warning on line 147 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L147

Added line #L147 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connectorprofiles.NewProfilesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewProfiles(conn, capability)
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
if err != nil {
return err

Check warning on line 153 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L153

Added line #L153 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalMetrics {
n.baseConsumer = capabilityconsumer.NewMetrics(
n.Component.(connector.Metrics),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildLogs(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs)
}
next := connector.NewLogsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToLogs(ctx, set, next)
if err != nil {
return err

Check warning on line 183 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L183

Added line #L183 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err

Check warning on line 189 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L189

Added line #L189 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err

Check warning on line 195 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L195

Added line #L195 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
if err != nil {
return err

Check warning on line 201 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L201

Added line #L201 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalLogs {
n.baseConsumer = capabilityconsumer.NewLogs(
n.Component.(connector.Logs),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildProfiles(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles)
}
next := connectorprofiles.NewProfilesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
if err != nil {
return err

Check warning on line 231 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L231

Added line #L231 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
if err != nil {
return err

Check warning on line 237 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L237

Added line #L237 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
if err != nil {
return err

Check warning on line 243 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L243

Added line #L243 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err

Check warning on line 249 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L249

Added line #L249 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == componentprofiles.SignalProfiles {
n.baseConsumer = capabilityconsumer.NewProfiles(
n.Component.(connectorprofiles.Profiles),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
func aggregateCapabilities(base baseConsumer, nexts []baseConsumer) consumer.Capabilities {
capabilities := base.Capabilities()
for _, next := range nexts {
capabilities.MutatesData = capabilities.MutatesData || next.Capabilities().MutatesData
}
return capabilities
}

0 comments on commit 012c912

Please sign in to comment.