Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][graph] Decompose buildConnector #11330

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
360 changes: 203 additions & 157 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,173 +55,219 @@
) error {
tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType)
set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}

switch n.rcvrPipelineType {
case pipeline.SignalTraces:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewTracesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewTraces(conn, capability)
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
return n.buildTraces(ctx, set, builder, nexts)
case pipeline.SignalMetrics:
return n.buildMetrics(ctx, set, builder, nexts)
case pipeline.SignalLogs:
return n.buildLogs(ctx, set, builder, nexts)
case componentprofiles.SignalProfiles:
return n.buildProfiles(ctx, set, builder, nexts)
}
return nil

Check warning on line 68 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L68

Added line #L68 was not covered by tests
}

func (n *connectorNode) buildTraces(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces)
}
next := connector.NewTracesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToTraces(ctx, set, next)
if err != nil {
return err

Check warning on line 87 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L87

Added line #L87 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
if err != nil {
return err

Check warning on line 93 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L93

Added line #L93 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err

Check warning on line 99 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L99

Added line #L99 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
if err != nil {
return err

Check warning on line 105 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L105

Added line #L105 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalTraces {
n.baseConsumer = capabilityconsumer.NewTraces(
n.Component.(connector.Traces),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildMetrics(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics)
}
next := connector.NewMetricsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
if err != nil {
return err

Check warning on line 135 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L135

Added line #L135 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewMetricsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewMetrics(conn, capability)
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err

Check warning on line 141 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L141

Added line #L141 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewLogsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewLogs(conn, capability)
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
if err != nil {
return err

Check warning on line 147 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L147

Added line #L147 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connectorprofiles.NewProfilesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewProfiles(conn, capability)
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
if err != nil {
return err

Check warning on line 153 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L153

Added line #L153 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalMetrics {
n.baseConsumer = capabilityconsumer.NewMetrics(
n.Component.(connector.Metrics),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildLogs(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs)
}
next := connector.NewLogsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToLogs(ctx, set, next)
if err != nil {
return err

Check warning on line 183 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L183

Added line #L183 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err

Check warning on line 189 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L189

Added line #L189 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err

Check warning on line 195 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L195

Added line #L195 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
if err != nil {
return err

Check warning on line 201 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L201

Added line #L201 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalLogs {
n.baseConsumer = capabilityconsumer.NewLogs(
n.Component.(connector.Logs),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildProfiles(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles)
}
next := connectorprofiles.NewProfilesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
if err != nil {
return err

Check warning on line 231 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L231

Added line #L231 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
if err != nil {
return err

Check warning on line 237 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L237

Added line #L237 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
if err != nil {
return err

Check warning on line 243 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L243

Added line #L243 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err

Check warning on line 249 in service/internal/graph/connector.go

View check run for this annotation

Codecov / codecov/patch

service/internal/graph/connector.go#L249

Added line #L249 was not covered by tests
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == componentprofiles.SignalProfiles {
n.baseConsumer = capabilityconsumer.NewProfiles(
n.Component.(connectorprofiles.Profiles),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
func aggregateCapabilities(base baseConsumer, nexts []baseConsumer) consumer.Capabilities {
capabilities := base.Capabilities()
for _, next := range nexts {
capabilities.MutatesData = capabilities.MutatesData || next.Capabilities().MutatesData
}
return capabilities
}
Loading