From 012c9129dbe7c6a51594a91d325ceda8ee85839b Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 1 Oct 2024 17:03:41 -0400 Subject: [PATCH] [chore][graph] Decompose buildConnector --- service/internal/graph/connector.go | 360 ++++++++++++++++------------ 1 file changed, 203 insertions(+), 157 deletions(-) diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index 8de04f1994e..d9ad44f885c 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -55,173 +55,219 @@ func (n *connectorNode) buildComponent( ) error { tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType) set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} - switch n.rcvrPipelineType { case pipeline.SignalTraces: - capability := consumer.Capabilities{MutatesData: false} - consumers := make(map[pipeline.ID]consumer.Traces, len(nexts)) - for _, next := range nexts { - consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces) - capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData - } - next := connector.NewTracesRouter(consumers) - - switch n.exprPipelineType { - case pipeline.SignalTraces: - conn, err := builder.CreateTracesToTraces(ctx, set, next) - if err != nil { - return err - } - n.Component = conn - // When connecting pipelines of the same data type, the connector must - // inherit the capabilities of pipelines in which it is acting as a receiver. - // Since the incoming and outgoing data types are the same, we must also consider - // that the connector itself may MutatesData. - capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData - n.baseConsumer = capabilityconsumer.NewTraces(conn, capability) - case pipeline.SignalMetrics: - conn, err := builder.CreateMetricsToTraces(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalLogs: - conn, err := builder.CreateLogsToTraces(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case componentprofiles.SignalProfiles: - conn, err := builder.CreateProfilesToTraces(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn + return n.buildTraces(ctx, set, builder, nexts) + case pipeline.SignalMetrics: + return n.buildMetrics(ctx, set, builder, nexts) + case pipeline.SignalLogs: + return n.buildLogs(ctx, set, builder, nexts) + case componentprofiles.SignalProfiles: + return n.buildProfiles(ctx, set, builder, nexts) + } + return nil +} + +func (n *connectorNode) buildTraces( + ctx context.Context, + set connector.Settings, + builder *builders.ConnectorBuilder, + nexts []baseConsumer, +) error { + consumers := make(map[pipeline.ID]consumer.Traces, len(nexts)) + for _, next := range nexts { + consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces) + } + next := connector.NewTracesRouter(consumers) + + switch n.exprPipelineType { + case pipeline.SignalTraces: + conn, err := builder.CreateTracesToTraces(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case pipeline.SignalMetrics: + conn, err := builder.CreateMetricsToTraces(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case pipeline.SignalLogs: + conn, err := builder.CreateLogsToTraces(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case componentprofiles.SignalProfiles: + conn, err := builder.CreateProfilesToTraces(ctx, set, next) + if err != nil { + return err } + n.Component, n.baseConsumer = conn, conn + } + + if n.exprPipelineType == pipeline.SignalTraces { + n.baseConsumer = capabilityconsumer.NewTraces( + n.Component.(connector.Traces), + aggregateCapabilities(n.baseConsumer, nexts), + ) + } + return nil +} + +func (n *connectorNode) buildMetrics( + ctx context.Context, + set connector.Settings, + builder *builders.ConnectorBuilder, + nexts []baseConsumer, +) error { + consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts)) + for _, next := range nexts { + consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics) + } + next := connector.NewMetricsRouter(consumers) + switch n.exprPipelineType { + case pipeline.SignalTraces: + conn, err := builder.CreateTracesToMetrics(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn case pipeline.SignalMetrics: - capability := consumer.Capabilities{MutatesData: false} - consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts)) - for _, next := range nexts { - consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics) - capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData - } - next := connector.NewMetricsRouter(consumers) - - switch n.exprPipelineType { - case pipeline.SignalTraces: - conn, err := builder.CreateTracesToMetrics(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalMetrics: - conn, err := builder.CreateMetricsToMetrics(ctx, set, next) - if err != nil { - return err - } - n.Component = conn - // When connecting pipelines of the same data type, the connector must - // inherit the capabilities of pipelines in which it is acting as a receiver. - // Since the incoming and outgoing data types are the same, we must also consider - // that the connector itself may MutatesData. - capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData - n.baseConsumer = capabilityconsumer.NewMetrics(conn, capability) - case pipeline.SignalLogs: - conn, err := builder.CreateLogsToMetrics(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case componentprofiles.SignalProfiles: - conn, err := builder.CreateProfilesToMetrics(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn + conn, err := builder.CreateMetricsToMetrics(ctx, set, next) + if err != nil { + return err } + n.Component, n.baseConsumer = conn, conn case pipeline.SignalLogs: - capability := consumer.Capabilities{MutatesData: false} - consumers := make(map[pipeline.ID]consumer.Logs, len(nexts)) - for _, next := range nexts { - consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs) - capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData - } - next := connector.NewLogsRouter(consumers) - - switch n.exprPipelineType { - case pipeline.SignalTraces: - conn, err := builder.CreateTracesToLogs(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalMetrics: - conn, err := builder.CreateMetricsToLogs(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalLogs: - conn, err := builder.CreateLogsToLogs(ctx, set, next) - if err != nil { - return err - } - n.Component = conn - // When connecting pipelines of the same data type, the connector must - // inherit the capabilities of pipelines in which it is acting as a receiver. - // Since the incoming and outgoing data types are the same, we must also consider - // that the connector itself may MutatesData. - capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData - n.baseConsumer = capabilityconsumer.NewLogs(conn, capability) - case componentprofiles.SignalProfiles: - conn, err := builder.CreateProfilesToLogs(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn + conn, err := builder.CreateLogsToMetrics(ctx, set, next) + if err != nil { + return err } + n.Component, n.baseConsumer = conn, conn case componentprofiles.SignalProfiles: - capability := consumer.Capabilities{MutatesData: false} - consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts)) - for _, next := range nexts { - consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles) - capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData - } - next := connectorprofiles.NewProfilesRouter(consumers) - - switch n.exprPipelineType { - case pipeline.SignalTraces: - conn, err := builder.CreateTracesToProfiles(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalMetrics: - conn, err := builder.CreateMetricsToProfiles(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case pipeline.SignalLogs: - conn, err := builder.CreateLogsToProfiles(ctx, set, next) - if err != nil { - return err - } - n.Component, n.baseConsumer = conn, conn - case componentprofiles.SignalProfiles: - conn, err := builder.CreateProfilesToProfiles(ctx, set, next) - if err != nil { - return err - } - n.Component = conn - // When connecting pipelines of the same data type, the connector must - // inherit the capabilities of pipelines in which it is acting as a receiver. - // Since the incoming and outgoing data types are the same, we must also consider - // that the connector itself may MutatesData. - capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData - n.baseConsumer = capabilityconsumer.NewProfiles(conn, capability) + conn, err := builder.CreateProfilesToMetrics(ctx, set, next) + if err != nil { + return err } + n.Component, n.baseConsumer = conn, conn + } + + if n.exprPipelineType == pipeline.SignalMetrics { + n.baseConsumer = capabilityconsumer.NewMetrics( + n.Component.(connector.Metrics), + aggregateCapabilities(n.baseConsumer, nexts), + ) } return nil } + +func (n *connectorNode) buildLogs( + ctx context.Context, + set connector.Settings, + builder *builders.ConnectorBuilder, + nexts []baseConsumer, +) error { + consumers := make(map[pipeline.ID]consumer.Logs, len(nexts)) + for _, next := range nexts { + consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs) + } + next := connector.NewLogsRouter(consumers) + + switch n.exprPipelineType { + case pipeline.SignalTraces: + conn, err := builder.CreateTracesToLogs(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case pipeline.SignalMetrics: + conn, err := builder.CreateMetricsToLogs(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case pipeline.SignalLogs: + conn, err := builder.CreateLogsToLogs(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case componentprofiles.SignalProfiles: + conn, err := builder.CreateProfilesToLogs(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + } + + if n.exprPipelineType == pipeline.SignalLogs { + n.baseConsumer = capabilityconsumer.NewLogs( + n.Component.(connector.Logs), + aggregateCapabilities(n.baseConsumer, nexts), + ) + } + return nil +} + +func (n *connectorNode) buildProfiles( + ctx context.Context, + set connector.Settings, + builder *builders.ConnectorBuilder, + nexts []baseConsumer, +) error { + consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts)) + for _, next := range nexts { + consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles) + } + next := connectorprofiles.NewProfilesRouter(consumers) + + switch n.exprPipelineType { + case pipeline.SignalTraces: + conn, err := builder.CreateTracesToProfiles(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case pipeline.SignalMetrics: + conn, err := builder.CreateMetricsToProfiles(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case pipeline.SignalLogs: + conn, err := builder.CreateLogsToProfiles(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + case componentprofiles.SignalProfiles: + conn, err := builder.CreateProfilesToProfiles(ctx, set, next) + if err != nil { + return err + } + n.Component, n.baseConsumer = conn, conn + } + + if n.exprPipelineType == componentprofiles.SignalProfiles { + n.baseConsumer = capabilityconsumer.NewProfiles( + n.Component.(connectorprofiles.Profiles), + aggregateCapabilities(n.baseConsumer, nexts), + ) + } + return nil +} + +// When connecting pipelines of the same data type, the connector must +// inherit the capabilities of pipelines in which it is acting as a receiver. +// Since the incoming and outgoing data types are the same, we must also consider +// that the connector itself may MutatesData. +func aggregateCapabilities(base baseConsumer, nexts []baseConsumer) consumer.Capabilities { + capabilities := base.Capabilities() + for _, next := range nexts { + capabilities.MutatesData = capabilities.MutatesData || next.Capabilities().MutatesData + } + return capabilities +}