diff --git a/.chloggen/issue-7047.yaml b/.chloggen/issue-7047.yaml new file mode 100644 index 00000000000..472e532d762 --- /dev/null +++ b/.chloggen/issue-7047.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: consumererror + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce an `Error` type that allows recording contextual information + +# One or more tracking issues or pull requests related to the change +issues: [7047] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Currently allows recording status codes on consumer errors, + but will be expanded in the future to record additional data. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/client/go.mod b/client/go.mod index 7b759acbca9..b42f453d0ea 100644 --- a/client/go.mod +++ b/client/go.mod @@ -18,9 +18,9 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/client/go.sum b/client/go.sum index 2d7de4069f1..5b43b909021 100644 --- a/client/go.sum +++ b/client/go.sum @@ -43,20 +43,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/consumer/consumererror/README.md b/consumer/consumererror/README.md new file mode 100644 index 00000000000..a76011b3b27 --- /dev/null +++ b/consumer/consumererror/README.md @@ -0,0 +1,297 @@ +# Consumer errors + +This package contains error types that should be returned by a consumer when an +error occurs while processing telemetry. The error types included in this +package provide functionality for communicating details about the error for use +upstream in the pipeline. Ideally the error returned by a component in its +`consume` function should be from this package. + +## Error classes + +**Retryable**: Errors are retryable if re-submitting data to a sink may result +in a successful submission. + +**Permanent**: Errors are permanent if submission will always fail for the +current data. Errors are considered permanent unless they are explicitly marked +as retryable. + +## Use cases + +**Retry logic**: Errors should be allowed to include information necessary to +perform retries. + +**Indicating partial success**: Errors can indicate that not all items were +accepted, for example as in an OTLP partial success message. OTLP backends will +return failed item counts if a partial success occurs, and this information can +be propagated up to a receiver and returned to the caller. + +**Communicating network error codes**: Errors should allow embedding information +necessary for the Collector to act as a proxy for a backend, i.e. relay a status +code returned from a backend in a response to a system upstream from the +Collector. + +## Current targets for using errors + +**Receivers**: Receivers should be able to consume multiple errors downstream +and determine the best course of action based on the user's configuration. This +may entail either keeping the retry queue inside of the Collector by having the +receiver keep track of retries, or may involve having the caller manage the +retry queue by returning a retryable network code with relevant information. + +**scraperhelper**: The scraper helper can use information about errors from +downstream to affect scraping. For example, if the backend is struggling with +the volume of data, scraping could be slowed, or the amount of data collected +could be reduced. + +**exporterhelper**: When an exporter returns a retryable error, the +exporterhelper can use this information to retry. Permanent errors will be +forwarded back up the pipeline. + +**obsreport**: Recording partial success information can ensure we correctly +track the number of failed telemetry records in the pipeline. Right now, all +records will be considered to be failed, which isn't accurate when partial +successes occur. + +## Creating Errors + +Errors can be created by calling `consumererror.New(err, opts...)` where `err` +is the underlying error, and `opts` is one of the provided options for supplying +additional metadata: + +- `consumererror.WithGRPCStatus` +- `consumererror.WithHTTPStatus` + +**Example**: + +```go +consumererror.New(err, + consumererror.WithGRPCStatus(codes.InvalidArgument), +) +``` + +The following options are not currently available, but may be made available in +the future: + +- `consumererror.WithRetry` +- `consumererror.WithPartial` +- `consumererror.WithMetadata` + +All options can be combined, we assume that the component knows what it is doing +when seemingly conflicting options. + +Two examples: + +- `WithRetry` and `WithPartial` are included together: Partial successes are + considered permanent errors in OTLP, which conflicts with making an error + retryable by including `WithRetry`. However, per our definition of what makes + a permanent error, this error has been marked as retryable, and therefore we + assume the component producing this error supports retyable partial success + errors. +- `WithGRPCStatus` and `WithHTTPStatus` are included together: While the + component likely only sent data over one of these transports, our errors will + produce the given status if it is included on the error, otherwise it will + translate a status from the status for the other transport. If both of these + are given, we assume the component wanted to perform its own status + conversion, and we will simply give the status for the requested transport + without performing any conversion. + +### Retrying data submission + +> [!WARNING] This function is currently in the design phase. It is not available +> and may not be added. The below is a design describing how this may work. + +If an error is transient, the `WithRetry` option corresponding to the relevant +signal should be used to indicate that the error is retryable and to pass on any +retry settings. These settings can come from the data sink or be determined by +the component, such as through runtime conditions or from user settings. + +The data for the retry will be provided by the component performing the retry. +This will require all processing to be completely redone; in the future, +including data from the failed component so as to not retry this processing may +be made as an available option. + +To ensure only the failed pipeline branch is retried, the sequence of components +that created the error will be recorded by a pipeline utility as the error goes +back up the pipeline. + +**Note**: If retry information is not included in an error, the error will be +considered permanent and will not be retried. + +**Usage:** + +```go +consumererror.WithRetry( + consumerrerror.WithRetryDelay(10 * time.Second) +) +``` + +The delay is an optional setting that can be provided if it is available. + +### Indicating partial success + +> [!WARNING] This function is currently in the design phase. It is not available +> and may not be added. The below is a design describing how this may work. + +If the component receives an OTLP partial success message (or other indication +of partial success), it should include this information with a count of the +failed records. + +**Usage:** + +```go +consumererror.WithPartial(failedRecords) +``` + +### Indicating error codes from network transports + +If the failure occurred due to a network transaction, the exporter should record +the status code of the message received from the backend. This information can +be then relayed to the receiver caller if necessary. Note that when the upstream +component reads a code, it will read a code for its desired transport, and the +code may be translated depending whether the input and output transports are +different. For example, a gRPC exporter may record a gRPC status. If a gRPC +receiver reads this status, it will be exactly the provided status. If an HTTP +receiver reads the status, it wants an HTTP status, and the gRPC status will be +converted to an equivalent HTTP code. + +**Usage:** + +```go +consumererror.WithGRPCStatus(codes.InvalidArgument) +consumererror.WithHTTPStatus(http.StatusTooManyRequests) +``` + +### Including custom data + +> [!WARNING] This function is currently in the design phase. It is not available +> and may not be added. The below is a design describing how this may work. + +Custom data can be included as well for any additional information that needs to +be propagated back up the pipeline. It is up to the consuming component if or +how this data will be used. + +**Usage:** + +```go +consumererror.WithMetadata(MyMetadataStuct{}) +``` + +To keep error analysis simple when looking at an error upstream in a pipeline, +the component closest to the source of an error or set of errors should make a +decision about the nature of the error. The following are a few places where +special considerations may need to be made. + +## Using errors + +### Fanouts + +When a fanout receives multiple errors, it will combine them with +`(consumererror.Error).Combine(errs...)` and pass them upstream. The upstream +component can then later pull all errors out for analysis. + +### Retrieving errors + +> [!WARNING] This functionality is currently experimental, and the description +> here is for design purposes. The code snippet may not work as-written. + +When a receiver gets a response that includes an error, it can get the data out +by doing something similar to the following: + +```go +err := nextConsumer.ConsumeLogs(ctx, ld) +cerr := &consumererror.Error{} + +if errors.As(err, &cerr) { + e.HTTPStatus() + e.Retryable() + e.Partial() +} +``` + +### Error data + +> [!WARNING] The description below is a design proposal for how this +> functionality may work. See `error.go` within this package for the current +> functionality. + +Obtaining data from an error can be done using an interface that looks something +like this: + +```go +type Error interface { + // Returns the underlying error + Error() error + + // Second argument is `false` if no code is available. + HTTPStatus() (int, bool) + + // Second argument is `false` if no code is available. + GRPCStatus() (*status.Status, bool) + + // Second argument is `false` if no retry information is available. + Retryable() (Retryable, bool) + + // Second argument is `false` if no partial counts were recorded. + Partial() (Partial, bool) +} + +type Retryable struct {} + +// Returns nil if no delay was set, indicating to use the default. +// This makes it so a delay of `0` indicates to resend immediately. +func (r *Retryable) Delay() *time.Duration {} + +type Partial struct {} +``` + +## Other considerations + +### Mixed error classes + +When a receiver sees a mixture of permanent and retryable errors from downstream +in the pipeline, it must first consider whether retries are enabled within the +Collector. + +**Retries are enabled**: Ignore the permanent errors, retry data submission for +only components that indicated retryable errors. + +**Retries are disabled**: In an asynchronous pipeline, simply do not retry any +data. In a synchronous pipeline, the receiver should return a permanent error +code indicating to the caller that it should not retry the data submission. This +is intended to not induce extra failures where we know the data submission will +fail, but this behavior could be made configurable by the user. + +### Signal conversion + +When converting between signals in a pipeline, it is expected that the connector +performing the conversion should perform the translation necessary in the error +for any signal item counts. If the converted count cannot be determined, the +full count of pre-converted signals should be returned. + +### Asynchronous processing + +The use of any components that do asynchronous processing in a pipeline will cut +off error backpropagation at the asynchronous component. The asynchronous +component may communicate error information using the Collector's own signals. + +## Transitioning + +> [!WARNING] This functionality is currently in the design phase. It is not +> available and may not be added. The below is a design describing how this may +> work. + +The following describes how to transition to these error types: + +- `NewPermanent`: To transition to new permanent errors, call + `consumererror.New` with the relevant metadata included in the invocation. + Errors will be permanent by default going forward. +- `New[Traces|Metrics|Logs]`: These functions will be deprecated in favor of + having the caller provide the data to retry. Current uses can invoke + `consumererror.New` with the `WithRetry` option to retry a request. +- `exporterhelper.NewThrottleRetry`: This will be replaced with `WithRetry`, and + can follow a similar approach as above. + +`consumererror.IsPermanent` will be deprecated in favor of checking whether +retry information is available, and only retrying if it has been provided. This +will be possible by calling `Error.Retryable()` and checking for retry +information. diff --git a/consumer/consumererror/error.go b/consumer/consumererror/error.go new file mode 100644 index 00000000000..f4d59871153 --- /dev/null +++ b/consumer/consumererror/error.go @@ -0,0 +1,173 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror" + +import ( + "errors" + "net/http" + + "google.golang.org/grpc/status" + + "go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" +) + +// Error is intended to be used to encapsulate various information that can add +// context to an error that occurred within a pipeline component. Error objects +// are constructed through calling `New` with the relevant options to capture +// data around the error that occurred. +// +// It may hold multiple errors from downstream components, and can be merged +// with other errors as it travels upstream using `Combine`. The `Error` should +// be obtained from a given `error` object using `errors.As`. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +type Error struct { + error + httpStatus int + grpcStatus *status.Status + partialMsg string + partialRejected int64 +} + +var _ error = (*Error)(nil) + +// ErrorOption allows annotating an Error with metadata. +type ErrorOption interface { + applyOption(*Error) +} + +type errorOptionFunc func(*Error) + +func (f errorOptionFunc) applyOption(e *Error) { + f(e) +} + +// New wraps an error that happened while consuming telemetry and adds metadata +// onto it to be passed back up the pipeline. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +func New(origErr error, options ...ErrorOption) error { + err := &Error{error: origErr} + + for _, option := range options { + option.applyOption(err) + } + + return err +} + +// Combine joins errors into a single `Error` object. The component that +// initiated the data submission can then work with the `Error` object to +// understand the failure. +func Combine(errs ...error) error { + e := &Error{error: errors.Join(errs...)} + + resultingStatus := 0 + + for _, err := range errs { + var otherErr *Error + if errors.As(err, &otherErr) { + if otherErr.httpStatus != 0 { + resultingStatus = aggregateStatuses(resultingStatus, otherErr.httpStatus) + } else if otherErr.grpcStatus != nil { + resultingStatus = aggregateStatuses(resultingStatus, statusconversion.GetHTTPStatusCodeFromStatus(otherErr.grpcStatus)) + } + } + } + + if resultingStatus != 0 { + e.httpStatus = resultingStatus + } + + return e +} + +// WithHTTPStatus records an HTTP status code that was received from a server +// during data submission. +func WithHTTPStatus(status int) ErrorOption { + return errorOptionFunc(func(err *Error) { + err.httpStatus = status + }) +} + +// WithGRPCStatus records a gRPC status code that was received from a server +// during data submission. +func WithGRPCStatus(status *status.Status) ErrorOption { + return errorOptionFunc(func(err *Error) { + err.grpcStatus = status + }) +} + +// Experimental function for demonstration purposes only +func WithExperimentalPartial(msg string, rejected int64) ErrorOption { + return errorOptionFunc(func(err *Error) { + err.partialMsg = msg + err.partialRejected = rejected + }) +} + +// Error implements the error interface. +func (e *Error) Error() string { + return e.error.Error() +} + +// Unwrap returns the wrapped error for use by `errors.Is` and `errors.As`. +func (e *Error) Unwrap() error { + return e.error +} + +// HTTPStatus returns an HTTP status code either directly set by the source or +// derived from a gRPC status code set by the source. If both statuses are set, +// the HTTP status code is returned. +// +// If no code has been set, the second return value is `false`. +func (e *Error) HTTPStatus() (int, bool) { + if e.httpStatus != 0 { + return e.httpStatus, true + } else if e.grpcStatus != nil { + return statusconversion.GetHTTPStatusCodeFromStatus(e.grpcStatus), true + } + + return 0, false +} + +// GRPCStatus returns an gRPC status code either directly set by the source or +// derived from an HTTP status code set by the source. If both statuses are set, +// the gRPC status code is returned. +// +// If no code has been set, the second return value is `false`. +func (e *Error) GRPCStatus() (*status.Status, bool) { + if e.grpcStatus != nil { + return e.grpcStatus, true + } else if e.httpStatus != 0 { + return statusconversion.NewStatusFromMsgAndHTTPCode(e.Error(), e.httpStatus), true + } + + return nil, false +} + +// Experimental function for demonstration purposes only +func (e *Error) ExperimentalPartialSuccess() (string, int64, bool) { + return e.partialMsg, e.partialRejected, e.partialMsg != "" || e.partialRejected != 0 +} + +func aggregateStatuses(a int, b int) int { + switch { + // If a is unset, keep b. b is guaranteed to be non-zero by the caller. + case a == 0: + return b + // If a and b have been set and one is a 5xx code, the correct code is + // ambiguous, so return a 500, which is permanent. + case a >= 500 || b >= 500: + return http.StatusInternalServerError + // If a and b have been set and one is a 4xx code, the correct code is + // ambiguous, so return a 400, which is permanent. + case (a >= 400 && a < 500) || (b >= 400 && b < 500): + return http.StatusBadRequest + default: + return http.StatusInternalServerError + } +} diff --git a/consumer/consumererror/error_test.go b/consumer/consumererror/error_test.go new file mode 100644 index 00000000000..1f380127930 --- /dev/null +++ b/consumer/consumererror/error_test.go @@ -0,0 +1,306 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumererror + +import ( + "errors" + "net/http" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var errTest = errors.New("consumererror testing error") + +func Test_New(t *testing.T) { + httpStatus := 500 + grpcStatus := status.New(codes.Aborted, "aborted") + wantErr := &Error{ + error: errTest, + httpStatus: httpStatus, + grpcStatus: grpcStatus, + } + + newErr := New(errTest, + WithHTTPStatus(httpStatus), + WithGRPCStatus(grpcStatus), + ) + + require.Equal(t, wantErr, newErr) +} + +func Test_Error(t *testing.T) { + newErr := New(errTest) + + require.Equal(t, errTest.Error(), newErr.Error()) +} + +func TestUnwrap(t *testing.T) { + err := &Error{ + error: errTest, + } + + unwrapped := err.Unwrap() + + require.Equal(t, errTest, unwrapped) +} + +func TestCombine(t *testing.T) { + err0 := &Error{} + err1 := &Error{} + want := &Error{error: errors.Join(err0, err1, errTest)} + + err := Combine(err0, err1, errTest) + + require.Equal(t, want, err) + +} + +func TestCombineStatusAggregation(t *testing.T) { + cases := []struct { + name string + errors []error + wantHTTP int + wantGRPC codes.Code + }{ + { + name: "No status codes", + errors: []error{}, + }, + { + name: "One status code first", + errors: []error{ + New(errTest, WithHTTPStatus(http.StatusTooManyRequests)), + New(errTest), + }, + wantHTTP: http.StatusTooManyRequests, + wantGRPC: codes.ResourceExhausted, + }, + { + name: "One status code second", + errors: []error{ + New(errTest), + New(errTest, WithHTTPStatus(http.StatusTooManyRequests)), + }, + wantHTTP: http.StatusTooManyRequests, + wantGRPC: codes.ResourceExhausted, + }, + { + name: "Two HTTP statuses", + errors: []error{ + New(errTest, WithHTTPStatus(http.StatusTooManyRequests)), + New(errTest, WithHTTPStatus(http.StatusNotFound)), + }, + wantHTTP: http.StatusBadRequest, + wantGRPC: codes.InvalidArgument, + }, + { + name: "Three HTTP statuses", + errors: []error{ + New(errTest, WithHTTPStatus(http.StatusTooManyRequests)), + New(errTest, WithHTTPStatus(http.StatusNotFound)), + New(errTest, WithHTTPStatus(http.StatusUnauthorized)), + }, + wantHTTP: http.StatusBadRequest, + wantGRPC: codes.InvalidArgument, + }, + { + name: "Two gRPC statuses", + errors: []error{ + New(errTest, WithGRPCStatus(status.New(codes.PermissionDenied, ""))), + New(errTest, WithGRPCStatus(status.New(codes.Unauthenticated, ""))), + }, + wantHTTP: http.StatusBadRequest, + wantGRPC: codes.InvalidArgument, + }, + { + name: "One HTTP, one gRPC status", + errors: []error{ + New(errTest, WithHTTPStatus(http.StatusTooManyRequests)), + New(errTest, WithGRPCStatus(status.New(codes.PermissionDenied, ""))), + }, + wantHTTP: http.StatusBadRequest, + wantGRPC: codes.InvalidArgument, + }, + { + name: "Two 4xx", + errors: []error{ + New(errTest, WithHTTPStatus(http.StatusTooManyRequests)), + New(errTest, WithHTTPStatus(http.StatusUnauthorized)), + }, + wantHTTP: http.StatusBadRequest, + wantGRPC: codes.InvalidArgument, + }, + { + name: "One 4xx, one 5xx", + errors: []error{ + New(errTest, WithHTTPStatus(http.StatusTooManyRequests)), + New(errTest, WithHTTPStatus(http.StatusServiceUnavailable)), + }, + wantHTTP: http.StatusInternalServerError, + wantGRPC: codes.Unknown, + }, + { + name: "Two 5xx", + errors: []error{ + New(errTest, WithGRPCStatus(status.New(codes.DeadlineExceeded, ""))), + New(errTest, WithHTTPStatus(http.StatusBadGateway)), + }, + wantHTTP: http.StatusInternalServerError, + wantGRPC: codes.Unknown, + }, + { + name: "Neither 4xx nor 5xx", + errors: []error{ + New(errTest, WithHTTPStatus(http.StatusPermanentRedirect)), + New(errTest, WithHTTPStatus(http.StatusTemporaryRedirect)), + }, + wantHTTP: http.StatusInternalServerError, + wantGRPC: codes.Unknown, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + e := Combine(tt.errors...) + ce := &Error{} + + if errors.As(e, &ce) { + if tt.wantHTTP != 0 { + status, ok := ce.HTTPStatus() + require.True(t, ok) + require.Equal(t, tt.wantHTTP, status) + } + + if tt.wantGRPC != codes.OK { + status, ok := ce.GRPCStatus() + require.True(t, ok) + require.Equal(t, tt.wantGRPC, status.Code()) + } + } else { + require.Fail(t, "Combine did not return an Error type") + } + }) + } +} + +func TestError_Error(t *testing.T) { + err := &Error{ + error: errTest, + } + + require.Equal(t, errTest.Error(), err.Error()) +} + +func TestError_Unwrap(t *testing.T) { + err := &Error{ + error: errTest, + } + + require.Equal(t, errTest, err.Unwrap()) +} + +func TestError_HTTPStatus(t *testing.T) { + serverErr := http.StatusTooManyRequests + testCases := []struct { + name string + httpStatus int + grpcStatus *status.Status + want int + hasCode bool + }{ + { + name: "Passes through HTTP status", + httpStatus: serverErr, + want: serverErr, + hasCode: true, + }, + { + name: "Converts gRPC status", + grpcStatus: status.New(codes.ResourceExhausted, errTest.Error()), + want: serverErr, + hasCode: true, + }, + { + name: "Passes through HTTP status when gRPC status also present", + httpStatus: serverErr, + grpcStatus: status.New(codes.OK, errTest.Error()), + want: serverErr, + hasCode: true, + }, + { + name: "No statuses set", + hasCode: false, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + err := Error{ + error: errTest, + httpStatus: tt.httpStatus, + grpcStatus: tt.grpcStatus, + } + + s, ok := err.HTTPStatus() + + require.Equal(t, tt.hasCode, ok) + require.Equal(t, tt.want, s) + }) + } +} + +func TestError_GRPCStatus(t *testing.T) { + httpStatus := http.StatusTooManyRequests + otherHTTPStatus := http.StatusOK + serverErr := status.New(codes.ResourceExhausted, errTest.Error()) + testCases := []struct { + name string + httpStatus int + grpcStatus *status.Status + want *status.Status + hasCode bool + }{ + { + name: "Converts HTTP status", + httpStatus: httpStatus, + want: serverErr, + hasCode: true, + }, + { + name: "Passes through gRPC status", + grpcStatus: serverErr, + want: serverErr, + hasCode: true, + }, + { + name: "Passes through gRPC status when gRPC status also present", + httpStatus: otherHTTPStatus, + grpcStatus: serverErr, + want: serverErr, + hasCode: true, + }, + { + name: "No statuses set", + hasCode: false, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + err := Error{ + error: errTest, + httpStatus: tt.httpStatus, + grpcStatus: tt.grpcStatus, + } + + s, ok := err.GRPCStatus() + + require.Equal(t, tt.hasCode, ok) + require.Equal(t, tt.want, s) + }) + } +} diff --git a/consumer/consumererror/internal/statusconversion/conversion.go b/consumer/consumererror/internal/statusconversion/conversion.go new file mode 100644 index 00000000000..4c34cd0681c --- /dev/null +++ b/consumer/consumererror/internal/statusconversion/conversion.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package statusconversion // import "go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" + +import ( + "net/http" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func GetHTTPStatusCodeFromStatus(s *status.Status) int { + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures + // to see if a code is retryable. + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1 + // to see a list of retryable http status codes. + switch s.Code() { + // Retryable + case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return http.StatusServiceUnavailable + // Retryable + case codes.ResourceExhausted: + return http.StatusTooManyRequests + // Not Retryable + case codes.InvalidArgument: + return http.StatusBadRequest + // Not Retryable + case codes.Unauthenticated: + return http.StatusUnauthorized + // Not Retryable + case codes.PermissionDenied: + return http.StatusForbidden + // Not Retryable + case codes.Unimplemented: + return http.StatusNotFound + // Not Retryable + default: + return http.StatusInternalServerError + } +} + +func NewStatusFromMsgAndHTTPCode(errMsg string, statusCode int) *status.Status { + var c codes.Code + // Mapping based on https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md + // 429 mapping to ResourceExhausted and 400 mapping to StatusBadRequest are exceptions. + switch statusCode { + case http.StatusBadRequest: + c = codes.InvalidArgument + case http.StatusUnauthorized: + c = codes.Unauthenticated + case http.StatusForbidden: + c = codes.PermissionDenied + case http.StatusNotFound: + c = codes.Unimplemented + case http.StatusTooManyRequests: + c = codes.ResourceExhausted + case http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + c = codes.Unavailable + default: + c = codes.Unknown + } + return status.New(c, errMsg) +} diff --git a/consumer/consumererror/internal/statusconversion/conversion_test.go b/consumer/consumererror/internal/statusconversion/conversion_test.go new file mode 100644 index 00000000000..99316253c96 --- /dev/null +++ b/consumer/consumererror/internal/statusconversion/conversion_test.go @@ -0,0 +1,113 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package statusconversion // import "go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func Test_GetHTTPStatusCodeFromStatus(t *testing.T) { + tests := []struct { + name string + input *status.Status + expected int + }{ + { + name: "Retryable Status", + input: status.New(codes.Unavailable, "test"), + expected: http.StatusServiceUnavailable, + }, + { + name: "Non-retryable Status", + input: status.New(codes.InvalidArgument, "test"), + expected: http.StatusBadRequest, + }, + { + name: "Specifically 429", + input: status.New(codes.ResourceExhausted, "test"), + expected: http.StatusTooManyRequests, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetHTTPStatusCodeFromStatus(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func Test_ErrorMsgAndHTTPCodeToStatus(t *testing.T) { + tests := []struct { + name string + errMsg string + statusCode int + expected *status.Status + }{ + { + name: "Bad Request", + errMsg: "test", + statusCode: http.StatusBadRequest, + expected: status.New(codes.InvalidArgument, "test"), + }, + { + name: "Unauthorized", + errMsg: "test", + statusCode: http.StatusUnauthorized, + expected: status.New(codes.Unauthenticated, "test"), + }, + { + name: "Forbidden", + errMsg: "test", + statusCode: http.StatusForbidden, + expected: status.New(codes.PermissionDenied, "test"), + }, + { + name: "Not Found", + errMsg: "test", + statusCode: http.StatusNotFound, + expected: status.New(codes.Unimplemented, "test"), + }, + { + name: "Too Many Requests", + errMsg: "test", + statusCode: http.StatusTooManyRequests, + expected: status.New(codes.ResourceExhausted, "test"), + }, + { + name: "Bad Gateway", + errMsg: "test", + statusCode: http.StatusBadGateway, + expected: status.New(codes.Unavailable, "test"), + }, + { + name: "Service Unavailable", + errMsg: "test", + statusCode: http.StatusServiceUnavailable, + expected: status.New(codes.Unavailable, "test"), + }, + { + name: "Gateway Timeout", + errMsg: "test", + statusCode: http.StatusGatewayTimeout, + expected: status.New(codes.Unavailable, "test"), + }, + { + name: "Unsupported Media Type", + errMsg: "test", + statusCode: http.StatusUnsupportedMediaType, + expected: status.New(codes.Unknown, "test"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := NewStatusFromMsgAndHTTPCode(tt.errMsg, tt.statusCode) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/consumer/consumererror/permanent.go b/consumer/consumererror/permanent.go index 71e26df214f..30274cec195 100644 --- a/consumer/consumererror/permanent.go +++ b/consumer/consumererror/permanent.go @@ -11,6 +11,8 @@ type permanent struct { err error } +var _ error = permanent{} + // NewPermanent wraps an error to indicate that it is a permanent error, i.e. an // error that will be always returned if its source receives the same inputs. func NewPermanent(err error) error { @@ -26,9 +28,10 @@ func (p permanent) Unwrap() error { return p.err } -// IsPermanent checks if an error was wrapped with the NewPermanent function, which -// is used to indicate that a given error will always be returned in the case -// that its sources receives the same input. +// IsPermanent checks if an error was wrapped with the NewPermanent function or +// otherwise is not a retryable error. A `true` return value indicates that a +// given error will always be returned in the case that its sources receives the +// same input. func IsPermanent(err error) bool { if err == nil { return false diff --git a/consumer/consumerprofiles/go.mod b/consumer/consumerprofiles/go.mod index 143439883d3..13ab97d91b5 100644 --- a/consumer/consumerprofiles/go.mod +++ b/consumer/consumerprofiles/go.mod @@ -23,9 +23,9 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/pdata v1.14.1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/consumer/consumerprofiles/go.sum b/consumer/consumerprofiles/go.sum index 528166b78c0..4c9c65b576f 100644 --- a/consumer/consumerprofiles/go.sum +++ b/consumer/consumerprofiles/go.sum @@ -42,20 +42,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/consumer/consumertest/go.mod b/consumer/consumertest/go.mod index f8ad910b393..8b39103dce0 100644 --- a/consumer/consumertest/go.mod +++ b/consumer/consumertest/go.mod @@ -22,9 +22,9 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/consumer/consumertest/go.sum b/consumer/consumertest/go.sum index 528166b78c0..4c9c65b576f 100644 --- a/consumer/consumertest/go.sum +++ b/consumer/consumertest/go.sum @@ -42,20 +42,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/consumer/go.mod b/consumer/go.mod index a62b1199aea..9e20fe9cb02 100644 --- a/consumer/go.mod +++ b/consumer/go.mod @@ -5,8 +5,9 @@ go 1.22.0 require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/pdata v1.14.1 - go.opentelemetry.io/collector/pdata/testdata v0.108.1 + go.opentelemetry.io/collector/pdata/testdata v0.105.0 go.uber.org/goleak v1.3.0 + google.golang.org/grpc v1.65.0 ) require ( @@ -18,11 +19,10 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.108.1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect - google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/consumer/go.sum b/consumer/go.sum index 528166b78c0..8ce637683be 100644 --- a/consumer/go.sum +++ b/consumer/go.sum @@ -42,20 +42,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/exporter/exporterhelper/obsexporter.go b/exporter/exporterhelper/obsexporter.go index 751aaf1aac5..8eb95de856e 100644 --- a/exporter/exporterhelper/obsexporter.go +++ b/exporter/exporterhelper/obsexporter.go @@ -5,6 +5,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" + "errors" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -12,6 +13,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" @@ -60,7 +62,18 @@ func (or *obsReport) startTracesOp(ctx context.Context) context.Context { // endTracesOp completes the export operation that was started with startTracesOp. func (or *obsReport) endTracesOp(ctx context.Context, numSpans int, err error) { - numSent, numFailedToSend := toNumItems(numSpans, err) + var numSent, numFailedToSend int64 + ce := &consumererror.Error{} + + if errors.As(err, &ce) { + _, partialCount, ok := ce.ExperimentalPartialSuccess() + + if ok { + numFailedToSend = partialCount + numSent = int64(numSpans) - numFailedToSend + } + } + or.recordMetrics(context.WithoutCancel(ctx), component.DataTypeTraces, numSent, numFailedToSend) endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey) } diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index b8d6dee2a75..18462aa4180 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -9,7 +9,6 @@ import ( "runtime" "time" - "go.uber.org/zap" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -86,15 +85,13 @@ func (e *baseExporter) shutdown(context.Context) error { func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { req := ptraceotlp.NewExportRequestFromTraces(td) resp, respErr := e.traceExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { - return err - } + partial := partialSuccess{} partialSuccess := resp.PartialSuccess() - if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { - e.settings.Logger.Warn("Partial success response", - zap.String("message", resp.PartialSuccess().ErrorMessage()), - zap.Int64("dropped_spans", resp.PartialSuccess().RejectedSpans()), - ) + partial.msg = partialSuccess.ErrorMessage() + partial.rejected = partialSuccess.RejectedSpans() + + if err := processError(respErr, partial); err != nil { + return err } return nil } @@ -102,15 +99,13 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { req := pmetricotlp.NewExportRequestFromMetrics(md) resp, respErr := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { - return err - } + partial := partialSuccess{} partialSuccess := resp.PartialSuccess() - if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { - e.settings.Logger.Warn("Partial success response", - zap.String("message", resp.PartialSuccess().ErrorMessage()), - zap.Int64("dropped_data_points", resp.PartialSuccess().RejectedDataPoints()), - ) + partial.msg = partialSuccess.ErrorMessage() + partial.rejected = partialSuccess.RejectedDataPoints() + + if err := processError(respErr, partial); err != nil { + return err } return nil } @@ -118,15 +113,13 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { req := plogotlp.NewExportRequestFromLogs(ld) resp, respErr := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err := processError(respErr); err != nil { - return err - } + partial := partialSuccess{} partialSuccess := resp.PartialSuccess() - if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { - e.settings.Logger.Warn("Partial success response", - zap.String("message", resp.PartialSuccess().ErrorMessage()), - zap.Int64("dropped_log_records", resp.PartialSuccess().RejectedLogRecords()), - ) + partial.msg = partialSuccess.ErrorMessage() + partial.rejected = partialSuccess.RejectedLogRecords() + + if err := processError(respErr, partial); err != nil { + return err } return nil } @@ -138,15 +131,21 @@ func (e *baseExporter) enhanceContext(ctx context.Context) context.Context { return ctx } -func processError(err error) error { - if err == nil { +func processError(err error, partial partialSuccess) error { + opts := []consumererror.ErrorOption{} + + if !(partial.msg == "" && partial.rejected == 0) { + opts = append(opts, consumererror.WithExperimentalPartial(partial.msg, partial.rejected)) + } + + if err == nil && len(opts) == 0 { // Request is successful, we are done. return nil } // We have an error, check gRPC status code. st := status.Convert(err) - if st.Code() == codes.OK { + if st.Code() == codes.OK && len(opts) == 0 { // Not really an error, still success. return nil } @@ -156,7 +155,7 @@ func processError(err error) error { if !shouldRetry(st.Code(), retryInfo) { // It is not a retryable error, we should not retry. - return consumererror.NewPermanent(err) + return err } // Check if server returned throttling information. @@ -166,8 +165,7 @@ func processError(err error) error { return exporterhelper.NewThrottleRetry(err, throttleDuration) } - // Need to retry. - return err + return exporterhelper.NewThrottleRetry(err, 5*time.Second) } func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool { @@ -207,3 +205,8 @@ func getThrottleDuration(t *errdetails.RetryInfo) time.Duration { } return 0 } + +type partialSuccess struct { + msg string + rejected int64 +} diff --git a/receiver/otlpreceiver/internal/trace/otlp.go b/receiver/otlpreceiver/internal/trace/otlp.go index 615b597f05e..b58e667d8ba 100644 --- a/receiver/otlpreceiver/internal/trace/otlp.go +++ b/receiver/otlpreceiver/internal/trace/otlp.go @@ -5,10 +5,12 @@ package trace // import "go.opentelemetry.io/collector/receiver/otlpreceiver/int import ( "context" + "errors" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" - "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" + otlperrors "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" "go.opentelemetry.io/collector/receiver/receiverhelper" ) @@ -49,7 +51,30 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt // NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503) // Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400) if err != nil { - return ptraceotlp.NewExportResponse(), errors.GetStatusFromError(err) + ce := &consumererror.Error{} + + if errors.As(err, &ce) { + res := ptraceotlp.NewExportResponse() + partialMsg, partialRejected, ok := ce.ExperimentalPartialSuccess() + + if ok { + res.PartialSuccess().SetErrorMessage(partialMsg) + res.PartialSuccess().SetRejectedSpans(partialRejected) + } + + var e error + s, ok := ce.GRPCStatus() + + if ok { + e = s.Err() + } else { + e = otlperrors.GetStatusFromError(err) + } + + return res, e + } + + return ptraceotlp.NewExportResponse(), otlperrors.GetStatusFromError(err) } return ptraceotlp.NewExportResponse(), nil diff --git a/receiver/otlpreceiver/otlphttp.go b/receiver/otlpreceiver/otlphttp.go index b895b8d2e7c..025dd0a5e85 100644 --- a/receiver/otlpreceiver/otlphttp.go +++ b/receiver/otlpreceiver/otlphttp.go @@ -4,6 +4,7 @@ package otlpreceiver // import "go.opentelemetry.io/collector/receiver/otlpreceiver" import ( + "errors" "fmt" "io" "mime" @@ -12,8 +13,8 @@ import ( spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/httphelper" - "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/logs" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/trace" @@ -149,9 +150,15 @@ func readAndCloseBody(resp http.ResponseWriter, req *http.Request, enc encoder) // writeError encodes the HTTP error inside a rpc.Status message as required by the OTLP protocol. func writeError(w http.ResponseWriter, encoder encoder, err error, statusCode int) { - s, ok := status.FromError(err) - if ok { - statusCode = errors.GetHTTPStatusCodeFromStatus(s) + var s *status.Status + ce := &consumererror.Error{} + if errors.As(err, &ce) { + code, ok := ce.HTTPStatus() + if ok { + statusCode = code + } else { + s = httphelper.NewStatusFromMsgAndHTTPCode(err.Error(), statusCode) + } } else { s = httphelper.NewStatusFromMsgAndHTTPCode(err.Error(), statusCode) }