diff --git a/service/host.go b/service/host.go deleted file mode 100644 index ce8dc530d40..00000000000 --- a/service/host.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package service // import "go.opentelemetry.io/collector/service" - -import ( - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/extension" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/service/extensions" - "go.opentelemetry.io/collector/service/internal/graph" -) - -// TODO: remove as part of https://github.com/open-telemetry/opentelemetry-collector/issues/7370 for service 1.0 -type getExporters interface { - GetExporters() map[component.DataType]map[component.ID]component.Component -} - -var _ getExporters = (*serviceHost)(nil) -var _ component.Host = (*serviceHost)(nil) - -type serviceHost struct { - asyncErrorChannel chan error - receivers *receiver.Builder - processors *processor.Builder - exporters *exporter.Builder - connectors *connector.Builder - extensions *extension.Builder - - buildInfo component.BuildInfo - - pipelines *graph.Graph - serviceExtensions *extensions.Extensions -} - -func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { - switch kind { - case component.KindReceiver: - return host.receivers.Factory(componentType) - case component.KindProcessor: - return host.processors.Factory(componentType) - case component.KindExporter: - return host.exporters.Factory(componentType) - case component.KindConnector: - return host.connectors.Factory(componentType) - case component.KindExtension: - return host.extensions.Factory(componentType) - } - return nil -} - -func (host *serviceHost) GetExtensions() map[component.ID]component.Component { - return host.serviceExtensions.GetExtensions() -} - -// Deprecated: [0.79.0] This function will be removed in the future. -// Several components in the contrib repository use this function so it cannot be removed -// before those cases are removed. In most cases, use of this function can be replaced by a -// connector. See https://github.com/open-telemetry/opentelemetry-collector/issues/7370 and -// https://github.com/open-telemetry/opentelemetry-collector/pull/7390#issuecomment-1483710184 -// for additional information. -func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Component { - return host.pipelines.GetExporters() -} - -func (host *serviceHost) notifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) { - host.serviceExtensions.NotifyComponentStatusChange(source, event) - if event.Status() == component.StatusFatalError { - host.asyncErrorChannel <- event.Err() - } -} diff --git a/service/internal/graph/host.go b/service/internal/graph/host.go new file mode 100644 index 00000000000..b954d2b2c58 --- /dev/null +++ b/service/internal/graph/host.go @@ -0,0 +1,164 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph // import "go.opentelemetry.io/collector/service/internal/graph" + +import ( + "net/http" + "path" + "runtime" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service/extensions" + "go.opentelemetry.io/collector/service/internal/zpages" +) + +// TODO: remove as part of https://github.com/open-telemetry/opentelemetry-collector/issues/7370 for service 1.0 +type getExporters interface { + GetExporters() map[component.DataType]map[component.ID]component.Component +} + +var _ getExporters = (*Host)(nil) +var _ component.Host = (*Host)(nil) + +type Host struct { + AsyncErrorChannel chan error + Receivers *receiver.Builder + Processors *processor.Builder + Exporters *exporter.Builder + Connectors *connector.Builder + Extensions *extension.Builder + + BuildInfo component.BuildInfo + + Pipelines *Graph + ServiceExtensions *extensions.Extensions +} + +func (host *Host) GetFactory(kind component.Kind, componentType component.Type) component.Factory { + switch kind { + case component.KindReceiver: + return host.Receivers.Factory(componentType) + case component.KindProcessor: + return host.Processors.Factory(componentType) + case component.KindExporter: + return host.Exporters.Factory(componentType) + case component.KindConnector: + return host.Connectors.Factory(componentType) + case component.KindExtension: + return host.Extensions.Factory(componentType) + } + return nil +} + +func (host *Host) GetExtensions() map[component.ID]component.Component { + return host.ServiceExtensions.GetExtensions() +} + +// Deprecated: [0.79.0] This function will be removed in the future. +// Several components in the contrib repository use this function so it cannot be removed +// before those cases are removed. In most cases, use of this function can be replaced by a +// connector. See https://github.com/open-telemetry/opentelemetry-collector/issues/7370 and +// https://github.com/open-telemetry/opentelemetry-collector/pull/7390#issuecomment-1483710184 +// for additional information. +func (host *Host) GetExporters() map[component.DataType]map[component.ID]component.Component { + return host.Pipelines.GetExporters() +} + +func (host *Host) NotifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) { + host.ServiceExtensions.NotifyComponentStatusChange(source, event) + if event.Status() == component.StatusFatalError { + host.AsyncErrorChannel <- event.Err() + } +} + +const ( + // Paths + zServicePath = "servicez" + zPipelinePath = "pipelinez" + zExtensionPath = "extensionz" + zFeaturePath = "featurez" +) + +var ( + // InfoVar is a singleton instance of the Info struct. + runtimeInfoVar [][2]string +) + +func init() { + runtimeInfoVar = [][2]string{ + {"StartTimestamp", time.Now().String()}, + {"Go", runtime.Version()}, + {"OS", runtime.GOOS}, + {"Arch", runtime.GOARCH}, + // Add other valuable runtime information here. + } +} + +func (host *Host) RegisterZPages(mux *http.ServeMux, pathPrefix string) { + mux.HandleFunc(path.Join(pathPrefix, zServicePath), host.zPagesRequest) + mux.HandleFunc(path.Join(pathPrefix, zPipelinePath), host.Pipelines.HandleZPages) + mux.HandleFunc(path.Join(pathPrefix, zExtensionPath), host.ServiceExtensions.HandleZPages) + mux.HandleFunc(path.Join(pathPrefix, zFeaturePath), handleFeaturezRequest) +} + +func (host *Host) zPagesRequest(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Service " + host.BuildInfo.Command}) + zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Build Info", Properties: getBuildInfoProperties(host.BuildInfo)}) + zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Runtime Info", Properties: runtimeInfoVar}) + zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ + Name: "Pipelines", + ComponentEndpoint: zPipelinePath, + Link: true, + }) + zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ + Name: "Extensions", + ComponentEndpoint: zExtensionPath, + Link: true, + }) + zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ + Name: "Features", + ComponentEndpoint: zFeaturePath, + Link: true, + }) + zpages.WriteHTMLPageFooter(w) +} + +func handleFeaturezRequest(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Feature Gates"}) + zpages.WriteHTMLFeaturesTable(w, getFeaturesTableData()) + zpages.WriteHTMLPageFooter(w) +} + +func getFeaturesTableData() zpages.FeatureGateTableData { + data := zpages.FeatureGateTableData{} + featuregate.GlobalRegistry().VisitAll(func(gate *featuregate.Gate) { + data.Rows = append(data.Rows, zpages.FeatureGateTableRowData{ + ID: gate.ID(), + Enabled: gate.IsEnabled(), + Description: gate.Description(), + Stage: gate.Stage().String(), + FromVersion: gate.FromVersion(), + ToVersion: gate.ToVersion(), + ReferenceURL: gate.ReferenceURL(), + }) + }) + return data +} + +func getBuildInfoProperties(buildInfo component.BuildInfo) [][2]string { + return [][2]string{ + {"Command", buildInfo.Command}, + {"Description", buildInfo.Description}, + {"Version", buildInfo.Version}, + } +} diff --git a/service/service.go b/service/service.go index 88efd1adb91..642f7393652 100644 --- a/service/service.go +++ b/service/service.go @@ -69,7 +69,7 @@ type Settings struct { type Service struct { buildInfo component.BuildInfo telemetrySettings component.TelemetrySettings - host *serviceHost + host *graph.Host collectorConf *confmap.Conf reporter status.Reporter @@ -81,14 +81,14 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() srv := &Service{ buildInfo: set.BuildInfo, - host: &serviceHost{ - receivers: set.Receivers, - processors: set.Processors, - exporters: set.Exporters, - connectors: set.Connectors, - extensions: set.Extensions, - buildInfo: set.BuildInfo, - asyncErrorChannel: set.AsyncErrorChannel, + host: &graph.Host{ + Receivers: set.Receivers, + Processors: set.Processors, + Exporters: set.Exporters, + Connectors: set.Connectors, + Extensions: set.Extensions, + BuildInfo: set.BuildInfo, + AsyncErrorChannel: set.AsyncErrorChannel, }, collectorConf: set.CollectorConf, } @@ -136,7 +136,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { // Construct telemetry attributes from build info and config's resource attributes. Resource: pcommonRes, } - srv.reporter = status.NewReporter(srv.host.notifyComponentStatusChange, func(err error) { + srv.reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) { if errors.Is(err, status.ErrStatusNotReady) { logger.Warn("Invalid transition", zap.Error(err)) } @@ -200,21 +200,21 @@ func (srv *Service) Start(ctx context.Context) error { // enable status reporting srv.reporter.Ready() - if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil { + if err := srv.host.ServiceExtensions.Start(ctx, srv.host); err != nil { return fmt.Errorf("failed to start extensions: %w", err) } if srv.collectorConf != nil { - if err := srv.host.serviceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil { + if err := srv.host.ServiceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil { return err } } - if err := srv.host.pipelines.StartAll(ctx, srv.host, srv.reporter); err != nil { + if err := srv.host.Pipelines.StartAll(ctx, srv.host, srv.reporter); err != nil { return fmt.Errorf("cannot start pipelines: %w", err) } - if err := srv.host.serviceExtensions.NotifyPipelineReady(); err != nil { + if err := srv.host.ServiceExtensions.NotifyPipelineReady(); err != nil { return err } @@ -257,15 +257,15 @@ func (srv *Service) Shutdown(ctx context.Context) error { // Begin shutdown sequence. srv.telemetrySettings.Logger.Info("Starting shutdown...") - if err := srv.host.serviceExtensions.NotifyPipelineNotReady(); err != nil { + if err := srv.host.ServiceExtensions.NotifyPipelineNotReady(); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err)) } - if err := srv.host.pipelines.ShutdownAll(ctx, srv.reporter); err != nil { + if err := srv.host.Pipelines.ShutdownAll(ctx, srv.reporter); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err)) } - if err := srv.host.serviceExtensions.Shutdown(ctx); err != nil { + if err := srv.host.ServiceExtensions.Shutdown(ctx); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err)) } @@ -282,9 +282,9 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e extensionsSettings := extensions.Settings{ Telemetry: srv.telemetrySettings, BuildInfo: srv.buildInfo, - Extensions: srv.host.extensions, + Extensions: srv.host.Extensions, } - if srv.host.serviceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.reporter)); err != nil { + if srv.host.ServiceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.reporter)); err != nil { return fmt.Errorf("failed to build extensions: %w", err) } return nil @@ -293,7 +293,7 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e // Creates the pipeline graph. func (srv *Service) initGraph(ctx context.Context, set Settings, cfg Config) error { var err error - if srv.host.pipelines, err = graph.Build(ctx, graph.Settings{ + if srv.host.Pipelines, err = graph.Build(ctx, graph.Settings{ Telemetry: srv.telemetrySettings, BuildInfo: srv.buildInfo, ReceiverBuilder: set.Receivers, diff --git a/service/service_test.go b/service/service_test.go index 200b41899be..6e1e43159ec 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -227,6 +227,7 @@ func TestServiceGetExporters(t *testing.T) { assert.NoError(t, srv.Shutdown(context.Background())) }) + // nolint expMap := srv.host.GetExporters() assert.Len(t, expMap, 3) assert.Len(t, expMap[component.DataTypeTraces], 1) @@ -443,10 +444,10 @@ func TestServiceFatalError(t *testing.T) { go func() { ev := component.NewFatalErrorEvent(assert.AnError) - srv.host.notifyComponentStatusChange(&component.InstanceID{}, ev) + srv.host.NotifyComponentStatusChange(&component.InstanceID{}, ev) }() - err = <-srv.host.asyncErrorChannel + err = <-srv.host.AsyncErrorChannel require.ErrorIs(t, err, assert.AnError) } diff --git a/service/zpages.go b/service/zpages.go deleted file mode 100644 index c7f7b494ad3..00000000000 --- a/service/zpages.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package service // import "go.opentelemetry.io/collector/service" - -import ( - "net/http" - "path" - "runtime" - "time" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/featuregate" - "go.opentelemetry.io/collector/service/internal/zpages" -) - -const ( - // Paths - zServicePath = "servicez" - zPipelinePath = "pipelinez" - zExtensionPath = "extensionz" - zFeaturePath = "featurez" -) - -var ( - // InfoVar is a singleton instance of the Info struct. - runtimeInfoVar [][2]string -) - -func init() { - runtimeInfoVar = [][2]string{ - {"StartTimestamp", time.Now().String()}, - {"Go", runtime.Version()}, - {"OS", runtime.GOOS}, - {"Arch", runtime.GOARCH}, - // Add other valuable runtime information here. - } -} - -func (host *serviceHost) RegisterZPages(mux *http.ServeMux, pathPrefix string) { - mux.HandleFunc(path.Join(pathPrefix, zServicePath), host.zPagesRequest) - mux.HandleFunc(path.Join(pathPrefix, zPipelinePath), host.pipelines.HandleZPages) - mux.HandleFunc(path.Join(pathPrefix, zExtensionPath), host.serviceExtensions.HandleZPages) - mux.HandleFunc(path.Join(pathPrefix, zFeaturePath), handleFeaturezRequest) -} - -func (host *serviceHost) zPagesRequest(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Content-Type", "text/html; charset=utf-8") - zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Service " + host.buildInfo.Command}) - zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Build Info", Properties: getBuildInfoProperties(host.buildInfo)}) - zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Runtime Info", Properties: runtimeInfoVar}) - zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ - Name: "Pipelines", - ComponentEndpoint: zPipelinePath, - Link: true, - }) - zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ - Name: "Extensions", - ComponentEndpoint: zExtensionPath, - Link: true, - }) - zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ - Name: "Features", - ComponentEndpoint: zFeaturePath, - Link: true, - }) - zpages.WriteHTMLPageFooter(w) -} - -func handleFeaturezRequest(w http.ResponseWriter, _ *http.Request) { - w.Header().Set("Content-Type", "text/html; charset=utf-8") - zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Feature Gates"}) - zpages.WriteHTMLFeaturesTable(w, getFeaturesTableData()) - zpages.WriteHTMLPageFooter(w) -} - -func getFeaturesTableData() zpages.FeatureGateTableData { - data := zpages.FeatureGateTableData{} - featuregate.GlobalRegistry().VisitAll(func(gate *featuregate.Gate) { - data.Rows = append(data.Rows, zpages.FeatureGateTableRowData{ - ID: gate.ID(), - Enabled: gate.IsEnabled(), - Description: gate.Description(), - Stage: gate.Stage().String(), - FromVersion: gate.FromVersion(), - ToVersion: gate.ToVersion(), - ReferenceURL: gate.ReferenceURL(), - }) - }) - return data -} - -func getBuildInfoProperties(buildInfo component.BuildInfo) [][2]string { - return [][2]string{ - {"Command", buildInfo.Command}, - {"Description", buildInfo.Description}, - {"Version", buildInfo.Version}, - } -}