diff --git a/docs/rfcs/component-status-reporting.md b/docs/rfcs/component-status-reporting.md new file mode 100644 index 00000000000..9dd243c5f1e --- /dev/null +++ b/docs/rfcs/component-status-reporting.md @@ -0,0 +1,119 @@ +# Component Status Reporting + +## Overview + +Since the OpenTelemetry Collector is made up of pipelines with components, it needs a way for the components within those pipelines to emit information about their health. This information allows the collector service, or other interested software or people, to make decisions about how to proceed when something goes wrong. This document describes: + +1. The historical state of how components reported health +2. The current state of how components report health +3. The goals component health reporting should achieve +4. Existing deviations from those goals +5. Desired behavior for 1.0 + +For context throughout this document, component defines a `component.Host` interface, which components may use to interact with the struct that is managing all the collector pipelines and the components. In this repository, our implementation of `component.Host` can be found in `service/internal/graph.Host`. + +## Out Of Scope + +How to get from the current to desired behavior is also considered out of scope and will be discussed on individual PRs. It will likely involve one or multiple feature gates, warnings and transition periods. + +## The Collector’s Historical method of reporting component health +Until recently, the Collector relied on four ways to report health. + +1. The `error` returned by the Component’s Start method. During startup, if any component decided to return an error, the Collector would stop gracefully. +2. The `component.Host.ReportFatalError` method. This method let components tell the `component.Host` that something bad happened and the collector needed to shut down. While this method could be used anywhere in the component, it was primarily used with a Component’s Start method to report errors in async work, such as starting a server. + ```golang + if errHTTP := fmr.server.Serve(listener); errHTTP != nil && !errors.Is(errHTTP, http.ErrServerClosed) { + host.ReportFatalError(errHTTP) + } + ``` +3. The error returned by `Shutdown`. This error was indicative that the collector did not cleanly shut down, but did not prevent the shutdown process from moving forward. + +4. Panicking. During runtime, if the collector experienced an unhandled error, it crashes. + +These are all the way the components in a collector could report that they were unhealthy. + +There are several major gaps in the Collector’s historic reporting of component health. First, many components return recoverable errors from Start, causing the collector to shutdown, while it could recover if the collector was allowed to run. Second, when a component experienced a transient error, such as an endpoint suddenly not working, the component would simply log the error and return it up the pipeline. There was no mechanism for the component to tell the `component.Host` or anything else that something was going wrong. Last, when a component experienced an issue it would never be able to recover from, such as receiving a 404 response from an endpoint, the component would log the error and return it up the pipeline. This situation was handled in the same way as the transient error, which means the component could not tell the `component.Host` or anything else that something was wrong, but worse is that the issue would never get better. + +## Current State of Component Health Reporting + +See [Component Status Reporting](../component-status.md) + +## The Goals the Component Health Reporting Should Achieve + +The following are the goals, as of June 2024 and with Collector 1.0 looming, for a component health reporting system. + +1. A `component.Host` implementation, such as `service/internal/graph.Host`, may report statuses Starting, Ok, Stopping and PermanentError on behalf of components. + - Additional status may be reported in the future +2. Components may opt-in to reporting health status at runtime. Components must not be required to report health statuses themselves. + - The consumers of the health reporting system must be able to identify which components are and are not opting to report their own statuses. +3. Component health reporting must be opt-in for collector users. While the underlying components are always allowed to report their health via the system, the `component.Host` implementation, such as `service/internal/graph.Host`, or any other listener may only take action when the user has configured the collector accordingly. + - As one example of compliance, the current health reporting system is dependent on the user configuring an extension that can watch for status updates. +4. Component health must be representable as a finite state machine with clear transitions between states. +5. Component health reporting must only be a mechanism for reporting health - it should have no mechanisms for taking actions on the health it reports. How consumers of the health reporting system respond to component updates is not a concern of the health reporting system. + +## Existing deviations from those goals + +### Fatal Error Reporting + +Before the current implementation of component status reporting, a component could stop the collector by using `component.Host.ReportFatalError`. Now, a component MUST use component status reporting and emit a `FatalError`. This fact is in conflict with Goal 1, which states component health reporting must be opt-in for components. + +A couple solutions: +1. Accept this reality as an exception to Goal 2. +2. Add back `component.Host.ReportFatalError`. +3. Remove the ability for components to stop the collector be removing `FatalError`. + +### No way to identify components that are not reporting status +Goal 2 states that consumers of component status reporting must be able to identify components in use that have not opted in to component status reporting. Our current implementation does not have this feature. + +### Should component health reporting be an opt-in for `component.Host` implementations? + +The current implementation of component status reporting does not add anything to `component.Host` to force a `component.Host` implementation, such as `service/internal/graph.Host`, to be compatible with component status reporting. Instead, it adds `ReportStatus func(*StatusEvent)` to `component.TelemetrySettings` and things that instantiate components, such as `service/internal/graph.Host`, should, but are not required, to pass in a value for `ReportStatus`. + +As a result, `component.Host` implementation is not required to engage with the component status reporting system. This could lead to situations where a user adds a status watcher extension that can do nothing because the `component.Host` is not reporting component status updates. + +Is this acceptable? Should we: +1. Require the `component.Host` implementations be compatible with the component status reporting framework? +2. Add some sort of configuration/build flag then enforces the `component.Host` implementation be compatible (or not) with component status reporting? +3. Accept this edge case. + +### Component TelemetrySettings Requirements + +The current implementation of component status reporting added a new field to `component.TelemetrySettings`, `ReportStatus`. This field is technically optional, but would be marked as stable with component 1.0. Are we ok with 1 of the following? + +1. Including a component status reporting feature, `component.TelemetrySettings.ReportStatus`, in the 1.0 version of `component.TelemetrySettings`? +2. Marking `component.TelemetrySettings.ReportStatus` as experimentatal via godoc comments in the 1.0 version of `component.TelemetrySettings`? + +Or should we refactor `component` somehow to remove `ReportStatus` from `component.TelemetrySettings`? + +## Desired Behavior for 1.0 + +For each listed deviation, the solution for unblocking component 1.0 is: + +- `Fatal Error Reporting` :white_check_mark:: The `component` module provides no mechanism for a component to stop a collector after it has started. It is expected that an error returned from `Start` will terminate a starting Collector, but it is ultimately up to the caller of `Start` how to handle the returned error. A `component.Host` implementation may choose to provide a mechanism to stop a running collector via a different Interface, but doing so is not required. + - As part of this stance, we agree that the `component.Component.Start` method will continue returning an error. +- `No way to identify components that are not reporting status` :white_check_mark:: This can be implemented as a feature addition to component status reporting without blocking `component` 1.0 +- `Should component health reporting be an opt-in for component.Host implementations?` :white_check_mark:: Yes. A `component.Host` implementation is not required to provide a component status reporting feature. They may do so via an additional interface, such as `componentstatus.Reporter`. +- `Component TelemetrySettings Requirements` :white_check_mark:: `component.TelemetrySettings.ReportStatus` has been removed. Instead, component status reporting is expected to be provided via an additional interface that `component.Host` implements. Components can check if the `component.Host` implements the desired interface, such as `componentstatus.Reporter` to access component status reporting features. + + +## Reference +- Remove FatalError? Looking for opinions either way: https://github.com/open-telemetry/opentelemetry-collector/issues/9823 +- In order to prioritize lifecycle events over runtime events for status reporting, allow a component to transition from PermanentError -> Stopping: https://github.com/open-telemetry/opentelemetry-collector/issues/10058 +- Runtime status reporting for components in core: https://github.com/open-telemetry/opentelemetry-collector/issues/9957 +- Should Start return an error: https://github.com/open-telemetry/opentelemetry-collector/issues/9324 +- Should Shutdown return an error: https://github.com/open-telemetry/opentelemetry-collector/issues/9325 +- Status reporting doc incoming; preview here: https://github.com/mwear/opentelemetry-collector/blob/cc870fd2a7160da298acdda447511ea9a83455e0/docs/component-status.md +- Issues + - Closed: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/8349 + - Open: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/8816 +- Status Reporting PRs + - Closed + - https://github.com/open-telemetry/opentelemetry-collector/pull/5304 + - https://github.com/open-telemetry/opentelemetry-collector/pull/6550 + - https://github.com/open-telemetry/opentelemetry-collector/pull/6560 + - Merged + - https://github.com/open-telemetry/opentelemetry-collector/pull/8169 + + + + diff --git a/exporter/exporterbatcher/batcher.go b/exporter/exporterbatcher/batcher.go new file mode 100644 index 00000000000..7b9b3e1c598 --- /dev/null +++ b/exporter/exporterbatcher/batcher.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" + +// type Batcher[T any] struct { +// batches batch* + +// flushTimeout int // TODO +// flushFunc func(req T) err +// } + +// func (b *Batcher[T]) FlushIfNecessary() error { +// mu.Lock() + +// var batchToExport +// var now = time.Now() +// if now - lastFlushTime > flushTimeout || activeBatch.size() > minBatchSize { +// lastFlushTime = now +// batchToExport = activeBatch +// activeBatch = pendingBatches[0] +// pendingBatches = pendingBatches[1:] +// } +// qc.timer.Reset(batcher.FlushTimeout) +// mu.Unlock() +// flushFunc(req) +// } + + +// func (b *Batcher[T]) Push(item T) error { +// if maxSize != 0 && batches[0].size() + item.size() > maxSize: +// sdlfj + + +// } diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index 5b93dd1466a..c8bcdd26754 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -3,710 +3,710 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" -import ( - "context" - "errors" - "runtime" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter/exporterbatcher" - "go.opentelemetry.io/collector/exporter/exporterqueue" -) - -func TestBatchSender_Merge(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - - tests := []struct { - name string - batcherOption Option - }{ - { - name: "split_disabled", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - }, - { - name: "split_high_limit", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 1000 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - - // the first two requests should be merged into one and sent by reaching the minimum items size - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 - }, 50*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) - - // the third and fifth requests should be sent by reaching the timeout - // the fourth request should be ignored because of the merge error. - time.Sleep(50 * time.Millisecond) - - // should be ignored because of the merge error. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, - mergeErr: errors.New("merge error")})) - - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchExportError(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - tests := []struct { - name string - batcherOption Option - expectedRequests uint64 - expectedItems uint64 - }{ - { - name: "merge_only", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - }, - { - name: "merge_without_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 200 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - }, - { - name: "merge_with_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 20 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - expectedRequests: 1, - expectedItems: 20, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - - // the first two requests should be blocked by the batchSender. - time.Sleep(50 * time.Millisecond) - assert.Equal(t, uint64(0), sink.requestsCount.Load()) - - // the third request should trigger the export and cause an error. - errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} - require.NoError(t, be.send(context.Background(), errReq)) - - // the batch should be dropped since the queue doesn't have requeuing enabled. - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == tt.expectedRequests && - sink.itemsCount.Load() == tt.expectedItems && - be.batchSender.(*batchSender).activeRequests.Load() == 0 && - be.queueSender.(*queueSender).queue.Size() == 0 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_MergeOrSplit(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 5 - cfg.MaxSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - // should be sent right away by reaching the minimum items size. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 - }, 50*time.Millisecond, 10*time.Millisecond) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 - }, 50*time.Millisecond, 10*time.Millisecond) - - // request that cannot be split should be dropped. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, - mergeErr: errors.New("split error")})) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 - }, 50*time.Millisecond, 10*time.Millisecond) -} - -func TestBatchSender_Shutdown(t *testing.T) { - batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - - // To make the request reached the batchSender before shutdown. - time.Sleep(50 * time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) - - // shutdown should force sending the batch - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(3), sink.itemsCount.Load()) -} - -func TestBatchSender_Disabled(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = false - cfg.MaxSizeItems = 5 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // should be sent right away without splitting because batching is disabled. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(8), sink.itemsCount.Load()) -} - -func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { - invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ Request, req2 Request) ([]Request, - error) { - // reply with invalid 0 length slice if req2 is more than 20 items - if req2.(*fakeRequest).items > 20 { - return []Request{}, nil - } - // otherwise reply with a single request. - return []Request{req2}, nil - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 20 - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // first request should be ignored due to invalid merge/split function. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) - // second request should be sent after reaching the timeout. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) -} - -func TestBatchSender_PostShutdown(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, be.Shutdown(context.Background())) - - // Closed batch sender should act as a pass-through to not block queue draining. - sink := newFakeRequestSink() - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(8), sink.itemsCount.Load()) -} - -func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") - } - tests := []struct { - name string - batcherCfg exporterbatcher.Config - expectedRequests uint64 - expectedItems uint64 - }{ - { - name: "merge_only", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_without_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - cfg.MaxSizeItems = 200 - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_with_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 10 - return cfg - }(), - expectedRequests: 8, - expectedItems: 51, - }, - } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - qCfg := exporterqueue.NewDefaultConfig() - qCfg.NumConsumers = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 3rd request should be flushed by itself due to flush interval - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10 - }, 100*time.Millisecond, 10*time.Millisecond) - - // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 5, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) - if tt.batcherCfg.MaxSizeItems == 10 { - // in case of MaxSizeItems=10, wait for the leftover request to send - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21 - }, 50*time.Millisecond, 10*time.Millisecond) - } - - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 20, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchBlocking(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 3 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 6 blocking requests - wg := sync.WaitGroup{} - for i := 0; i < 6; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - // should be sent in two batches since the batch size is 3 - assert.Equal(t, uint64(2), sink.requestsCount.Load()) - assert.Equal(t, uint64(6), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) -} - -// Validate that the batch is cancelled once the first request in the request is cancelled -func TestBatchSender_BatchCancelled(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 2 blocking requests - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - wg.Add(1) - go func() { - time.Sleep(20 * time.Millisecond) // ensure this call is the second - assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - cancel() // canceling the first request should cancel the whole batch - wg.Wait() - - // nothing should be delivered - assert.Equal(t, uint64(0), sink.requestsCount.Load()) - assert.Equal(t, uint64(0), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestBatchSender_DrainActiveRequests(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 3 blocking requests with a timeout - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - - // give time for the first two requests to be batched - time.Sleep(20 * time.Millisecond) - - // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. - // It should take 120 milliseconds to complete. - require.NoError(t, be.Shutdown(context.Background())) - - assert.Equal(t, uint64(2), sink.requestsCount.Load()) - assert.Equal(t, uint64(3), sink.itemsCount.Load()) -} - -func TestBatchSender_WithBatcherOption(t *testing.T) { - tests := []struct { - name string - opts []Option - expectedErr bool - }{ - { - name: "no_funcs_set", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: true, - }, - { - name: "funcs_set_internally", - opts: []Option{withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: false, - }, - { - name: "funcs_set_twice", - opts: []Option{ - withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc)), - }, - expectedErr: true, - }, - { - name: "nil_funcs", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, - expectedErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, tt.opts...) - if tt.expectedErr { - assert.Nil(t, be) - assert.Error(t, err) - } else { - assert.NotNil(t, be) - assert.NoError(t, err) - } - }) - } -} - -func TestBatchSender_UnstartedShutdown(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - - err = be.Shutdown(context.Background()) - require.NoError(t, err) -} - -// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being -// merged. -func TestBatchSender_ShutdownDeadlock(t *testing.T) { - blockMerge := make(chan struct{}) - waitMerge := make(chan struct{}, 10) - - // blockedBatchMergeFunc blocks until the blockMerge channel is closed - blockedBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { - waitMerge <- struct{}{} - <-blockMerge - r1.(*fakeRequest).items += r2.(*fakeRequest).items - return r1, nil - } - - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 2 concurrent requests - go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - - // Wait for the requests to enter the merge function - <-waitMerge - - // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, - // then wait for the exporter to finish. - startShutdown := make(chan struct{}) - doneShutdown := make(chan struct{}) - go func() { - close(startShutdown) - require.Nil(t, be.Shutdown(context.Background())) - close(doneShutdown) - }() - <-startShutdown - close(blockMerge) - <-doneShutdown - - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) -} - -func TestBatchSenderWithTimeout(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 10 - tCfg := NewDefaultTimeoutSettings() - tCfg.Timeout = 50 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - WithTimeout(tCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 3 concurrent requests that should be merged in one batch - wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - wg.Done() - }() - } - wg.Wait() - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - - // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.Error(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - assert.NoError(t, be.Shutdown(context.Background())) - - // The sink should not change - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) -} - -func TestBatchSenderTimerResetNoConflict(t *testing.T) { - delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { - time.Sleep(30 * time.Millisecond) - if r1 == nil { - return r2, nil - } - fr1 := r1.(*fakeRequest) - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: fr1.items + fr2.items, - sink: fr1.sink, - exportErr: fr2.exportErr, - delay: fr1.delay + fr2.delay, - }, nil - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 50 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - - // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - time.Sleep(30 * time.Millisecond) - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 200*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestBatchSenderTimerFlush(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - time.Sleep(50 * time.Millisecond) - - // Send 2 concurrent requests that should be merged in one batch and sent immediately - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 30*time.Millisecond, 5*time.Millisecond) - - // Send another request that should be flushed after 100ms instead of 50ms since last flush - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // Confirm that it is not flushed in 50ms - time.Sleep(60 * time.Millisecond) - assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) - - // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) - time.Sleep(50 * time.Millisecond) - assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - require.NoError(t, be.Shutdown(context.Background())) -} - -func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) - require.NotNil(t, be) - require.NoError(t, err) - return be -} +// import ( +// "context" +// "errors" +// "runtime" +// "sync" +// "testing" +// "time" + +// "github.com/stretchr/testify/assert" +// "github.com/stretchr/testify/require" + +// "go.opentelemetry.io/collector/component/componenttest" +// "go.opentelemetry.io/collector/exporter/exporterbatcher" +// "go.opentelemetry.io/collector/exporter/exporterqueue" +// ) + +// func TestBatchSender_Merge(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 10 +// cfg.FlushTimeout = 100 * time.Millisecond + +// tests := []struct { +// name string +// batcherOption Option +// }{ +// { +// name: "split_disabled", +// batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// }, +// { +// name: "split_high_limit", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 1000 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be := queueBatchExporter(t, tt.batcherOption) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + +// // the first two requests should be merged into one and sent by reaching the minimum items size +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) + +// // the third and fifth requests should be sent by reaching the timeout +// // the fourth request should be ignored because of the merge error. +// time.Sleep(50 * time.Millisecond) + +// // should be ignored because of the merge error. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, +// mergeErr: errors.New("merge error")})) + +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_BatchExportError(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 10 +// tests := []struct { +// name string +// batcherOption Option +// expectedRequests uint64 +// expectedItems uint64 +// }{ +// { +// name: "merge_only", +// batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// }, +// { +// name: "merge_without_split_triggered", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 200 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// }, +// { +// name: "merge_with_split_triggered", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 20 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// expectedRequests: 1, +// expectedItems: 20, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be := queueBatchExporter(t, tt.batcherOption) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + +// // the first two requests should be blocked by the batchSender. +// time.Sleep(50 * time.Millisecond) +// assert.Equal(t, uint64(0), sink.requestsCount.Load()) + +// // the third request should trigger the export and cause an error. +// errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} +// require.NoError(t, be.send(context.Background(), errReq)) + +// // the batch should be dropped since the queue doesn't have requeuing enabled. +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == tt.expectedRequests && +// sink.itemsCount.Load() == tt.expectedItems && +// be.batchSender.(*batchSender).activeRequests.Load() == 0 && +// be.queueSender.(*queueSender).queue.Size() == 0 +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_MergeOrSplit(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 5 +// cfg.MaxSizeItems = 10 +// cfg.FlushTimeout = 100 * time.Millisecond +// be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// // should be sent right away by reaching the minimum items size. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// // big request should be broken down into two requests, both are sent right away. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// // request that cannot be split should be dropped. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, +// mergeErr: errors.New("split error")})) + +// // big request should be broken down into two requests, both are sent right away. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 +// }, 50*time.Millisecond, 10*time.Millisecond) +// } + +// func TestBatchSender_Shutdown(t *testing.T) { +// batchCfg := exporterbatcher.NewDefaultConfig() +// batchCfg.MinSizeItems = 10 +// be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + +// // To make the request reached the batchSender before shutdown. +// time.Sleep(50 * time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) + +// // shutdown should force sending the batch +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(3), sink.itemsCount.Load()) +// } + +// func TestBatchSender_Disabled(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.Enabled = false +// cfg.MaxSizeItems = 5 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // should be sent right away without splitting because batching is disabled. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(8), sink.itemsCount.Load()) +// } + +// func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { +// invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ Request, req2 Request) ([]Request, +// error) { +// // reply with invalid 0 length slice if req2 is more than 20 items +// if req2.(*fakeRequest).items > 20 { +// return []Request{}, nil +// } +// // otherwise reply with a single request. +// return []Request{req2}, nil +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 20 +// be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // first request should be ignored due to invalid merge/split function. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) +// // second request should be sent after reaching the timeout. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// } + +// func TestBatchSender_PostShutdown(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, +// fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// assert.NoError(t, be.Shutdown(context.Background())) + +// // Closed batch sender should act as a pass-through to not block queue draining. +// sink := newFakeRequestSink() +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(8), sink.itemsCount.Load()) +// } + +// func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") +// } +// tests := []struct { +// name string +// batcherCfg exporterbatcher.Config +// expectedRequests uint64 +// expectedItems uint64 +// }{ +// { +// name: "merge_only", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 20 * time.Millisecond +// return cfg +// }(), +// expectedRequests: 6, +// expectedItems: 51, +// }, +// { +// name: "merge_without_split_triggered", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 20 * time.Millisecond +// cfg.MaxSizeItems = 200 +// return cfg +// }(), +// expectedRequests: 6, +// expectedItems: 51, +// }, +// { +// name: "merge_with_split_triggered", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 10 +// return cfg +// }(), +// expectedRequests: 8, +// expectedItems: 51, +// }, +// } +// for _, tt := range tests { +// tt := tt +// t.Run(tt.name, func(t *testing.T) { +// qCfg := exporterqueue.NewDefaultConfig() +// qCfg.NumConsumers = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) +// require.NotNil(t, be) +// require.NoError(t, err) +// assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// assert.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // the 3rd request should be flushed by itself due to flush interval +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 5, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) +// if tt.batcherCfg.MaxSizeItems == 10 { +// // in case of MaxSizeItems=10, wait for the leftover request to send +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21 +// }, 50*time.Millisecond, 10*time.Millisecond) +// } + +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 20, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_BatchBlocking(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 3 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 6 blocking requests +// wg := sync.WaitGroup{} +// for i := 0; i < 6; i++ { +// wg.Add(1) +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) +// wg.Done() +// }() +// } +// wg.Wait() + +// // should be sent in two batches since the batch size is 3 +// assert.Equal(t, uint64(2), sink.requestsCount.Load()) +// assert.Equal(t, uint64(6), sink.itemsCount.Load()) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// // Validate that the batch is cancelled once the first request in the request is cancelled +// func TestBatchSender_BatchCancelled(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 2 blocking requests +// wg := sync.WaitGroup{} +// ctx, cancel := context.WithCancel(context.Background()) +// wg.Add(1) +// go func() { +// assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) +// wg.Done() +// }() +// wg.Add(1) +// go func() { +// time.Sleep(20 * time.Millisecond) // ensure this call is the second +// assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) +// wg.Done() +// }() +// cancel() // canceling the first request should cancel the whole batch +// wg.Wait() + +// // nothing should be delivered +// assert.Equal(t, uint64(0), sink.requestsCount.Load()) +// assert.Equal(t, uint64(0), sink.itemsCount.Load()) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func TestBatchSender_DrainActiveRequests(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 3 blocking requests with a timeout +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() + +// // give time for the first two requests to be batched +// time.Sleep(20 * time.Millisecond) + +// // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. +// // It should take 120 milliseconds to complete. +// require.NoError(t, be.Shutdown(context.Background())) + +// assert.Equal(t, uint64(2), sink.requestsCount.Load()) +// assert.Equal(t, uint64(3), sink.itemsCount.Load()) +// } + +// func TestBatchSender_WithBatcherOption(t *testing.T) { +// tests := []struct { +// name string +// opts []Option +// expectedErr bool +// }{ +// { +// name: "no_funcs_set", +// opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, +// expectedErr: true, +// }, +// { +// name: "funcs_set_internally", +// opts: []Option{withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, +// expectedErr: false, +// }, +// { +// name: "funcs_set_twice", +// opts: []Option{ +// withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, +// fakeBatchMergeSplitFunc)), +// }, +// expectedErr: true, +// }, +// { +// name: "nil_funcs", +// opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, +// expectedErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, tt.opts...) +// if tt.expectedErr { +// assert.Nil(t, be) +// assert.Error(t, err) +// } else { +// assert.NotNil(t, be) +// assert.NoError(t, err) +// } +// }) +// } +// } + +// func TestBatchSender_UnstartedShutdown(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) + +// err = be.Shutdown(context.Background()) +// require.NoError(t, err) +// } + +// // TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being +// // merged. +// func TestBatchSender_ShutdownDeadlock(t *testing.T) { +// blockMerge := make(chan struct{}) +// waitMerge := make(chan struct{}, 10) + +// // blockedBatchMergeFunc blocks until the blockMerge channel is closed +// blockedBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { +// waitMerge <- struct{}{} +// <-blockMerge +// r1.(*fakeRequest).items += r2.(*fakeRequest).items +// return r1, nil +// } + +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests +// go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() +// go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() + +// // Wait for the requests to enter the merge function +// <-waitMerge + +// // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, +// // then wait for the exporter to finish. +// startShutdown := make(chan struct{}) +// doneShutdown := make(chan struct{}) +// go func() { +// close(startShutdown) +// require.Nil(t, be.Shutdown(context.Background())) +// close(doneShutdown) +// }() +// <-startShutdown +// close(blockMerge) +// <-doneShutdown + +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) +// } + +// func TestBatchSenderWithTimeout(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 10 +// tCfg := NewDefaultTimeoutSettings() +// tCfg.Timeout = 50 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// WithTimeout(tCfg)) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 3 concurrent requests that should be merged in one batch +// wg := sync.WaitGroup{} +// for i := 0; i < 3; i++ { +// wg.Add(1) +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// wg.Done() +// }() +// } +// wg.Wait() +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) + +// // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender +// for i := 0; i < 3; i++ { +// wg.Add(1) +// go func() { +// assert.Error(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) +// wg.Done() +// }() +// } +// wg.Wait() + +// assert.NoError(t, be.Shutdown(context.Background())) + +// // The sink should not change +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) +// } + +// func TestBatchSenderTimerResetNoConflict(t *testing.T) { +// delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { +// time.Sleep(30 * time.Millisecond) +// if r1 == nil { +// return r2, nil +// } +// fr1 := r1.(*fakeRequest) +// fr2 := r2.(*fakeRequest) +// if fr2.mergeErr != nil { +// return nil, fr2.mergeErr +// } +// return &fakeRequest{ +// items: fr1.items + fr2.items, +// sink: fr1.sink, +// exportErr: fr2.exportErr, +// delay: fr1.delay + fr2.delay, +// }, nil +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 50 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// time.Sleep(30 * time.Millisecond) +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 200*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func TestBatchSenderTimerFlush(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 100 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() +// time.Sleep(50 * time.Millisecond) + +// // Send 2 concurrent requests that should be merged in one batch and sent immediately +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 30*time.Millisecond, 5*time.Millisecond) + +// // Send another request that should be flushed after 100ms instead of 50ms since last flush +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // Confirm that it is not flushed in 50ms +// time.Sleep(60 * time.Millisecond) +// assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) + +// // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) +// time.Sleep(50 * time.Millisecond) +// assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, +// WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) +// require.NotNil(t, be) +// require.NoError(t, err) +// return be +// } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 060edab813a..de9f8428d86 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -72,6 +72,7 @@ type queueSender struct { numConsumers int traceAttribute attribute.KeyValue consumers *queue.Consumers[Request] + batcher *queue.QueueBatcher[Request] obsrep *obsReport exporterID component.ID @@ -86,7 +87,19 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numCo obsrep: obsrep, exporterID: set.ID, } - consumeFunc := func(ctx context.Context, req Request) error { + + // consumeFunc := func(ctx context.Context, req Request) error { + // err := qs.nextSender.send(ctx, req) + // if err != nil { + // set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + // zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + // } + // return err + // } + + // qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc) + + exportFunc := func(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) if err != nil { set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, @@ -94,13 +107,17 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numCo } return err } - qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc) + qs.batcher = queue.NewQueueBatcher[Request](q, numConsumers, exportFunc) return qs } // Start is invoked during service startup. func (qs *queueSender) Start(ctx context.Context, host component.Host) error { - if err := qs.consumers.Start(ctx, host); err != nil { + // if err := qs.consumers.Start(ctx, host); err != nil { + // return err + // } + + if err := qs.batcher.Start(ctx, host); err != nil { return err } @@ -117,7 +134,10 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error { func (qs *queueSender) Shutdown(ctx context.Context) error { // Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - return qs.consumers.Shutdown(ctx) + + // return qs.consumers.Shutdown(ctx) + + return qs.batcher.Shutdown(ctx) } // send implements the requestSender interface. It puts the request in the queue. diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 98e1b281176..d0fce6abf22 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -40,19 +40,33 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } +func (q *boundedMemoryQueue[T]) ClaimAndRead(onClaim func()) (T, bool, func(error)) { + item, ok := q.sizedChannel.pop() + if !ok { + return item.req, ok, nil + } + q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req)) + onClaim() + return item.req, ok, nil +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) + item, ok := q.sizedChannel.pop() if !ok { return false } + q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req)) // the memory queue doesn't handle consume errors _ = consumeFunc(item.ctx, item.req) return true } +func (pq *boundedMemoryQueue[T]) CommitConsume(ctx context.Context, index uint64) { +} + // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { q.sizedChannel.shutdown() diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 7dd646c6ef3..a09ce17eb8c 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -188,33 +188,51 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } + +func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool, func(error)) { + if _, ok := pq.sizedChannel.pop(); !ok { + var t T + return t, false, nil + } + onClaim() + + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if consumed { + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + } + return req, true, onProcessingFinished +} + +func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) { + if err := pq.itemDispatchingFinish(ctx, index); err != nil { + pq.logger.Error("Error deleting item from queue", zap.Error(err)) + } +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { for { - var ( - req T - onProcessingFinished func(error) - consumed bool - ) - // If we are stopped we still process all the other events in the channel before, but we // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) - if !consumed { - return 0 - } - return pq.set.Sizer.Sizeof(req) - }) - if !ok { + if _, ok := pq.sizedChannel.pop(); !ok { return false } - if consumed { - onProcessingFinished(consumeFunc(context.Background(), req)) - return true + + req, onProcessingFinished, ok := pq.getNextItem(context.Background()) + if !ok { + return false } + + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + onProcessingFinished(consumeFunc(context.Background(), req)) + return true } } diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 35bc504579e..652a0a93be8 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -25,6 +25,10 @@ type Queue[T any] interface { // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error + + ClaimAndRead(onClaim func()) (T, bool, func(error)) + CommitConsume(ctx context.Context, index uint64) + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. diff --git a/exporter/internal/queue/queue_batcher.go b/exporter/internal/queue/queue_batcher.go new file mode 100644 index 00000000000..ae37b6eb62b --- /dev/null +++ b/exporter/internal/queue/queue_batcher.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "sync" + // "time" + + "go.opentelemetry.io/collector/component" +) + +type QueueBatcher[T any] struct { + queue Queue[T] + numWorkers int + exportFunc func(context.Context, T) error + stopWG sync.WaitGroup +} + +func NewQueueBatcher[T any](q Queue[T], numWorkers int, exportFunc func(context.Context, T) error) *QueueBatcher[T] { + return &QueueBatcher[T]{ + queue: q, + numWorkers: numWorkers, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, + } +} + +// Start ensures that queue and all consumers are started. +func (qb *QueueBatcher[T]) Start(ctx context.Context, host component.Host) error { + if err := qb.queue.Start(ctx, host); err != nil { + return err + } + + // timer := time.NewTimer(1 * time.Second) // TODO + // allocConsumer := make(chan bool, 2) + + // go func() { + // allocConsumer <- true + // }() + + var startWG sync.WaitGroup + for i := 0; i < qb.numWorkers; i++ { + qb.stopWG.Add(1) + startWG.Add(1) + go func() { + startWG.Done() + defer qb.stopWG.Done() + for { + // select { + // case <-timer.C: + // qb.batcher.FlushIfNecessary() // TODO + // case <- allocConsumer: + req, ok, onProcessingFinished := qb.queue.ClaimAndRead(func(){ }) + if !ok { + return + } + err := qb.exportFunc(ctx, req) + if onProcessingFinished != nil { + onProcessingFinished(err) + } + // Put item into the batch + // qb.batcher.Push(item) // TODO + // qb.batcher.FlushIfNecessary() // TODO + // } + } + }() + } + startWG.Wait() + + return nil +} + +// Shutdown ensures that queue and all QueueBatcher are stopped. +func (qb *QueueBatcher[T]) Shutdown(ctx context.Context) error { + if err := qb.queue.Shutdown(ctx); err != nil { + return err + } + qb.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go index 1702a38ac2f..9a0aae783ac 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/internal/queue/sized_channel.go @@ -7,6 +7,7 @@ import "sync/atomic" // sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. // The channel will accept elements until the total size of the elements reaches the capacity. +// Not thread safe. type sizedChannel[T any] struct { used *atomic.Int64 @@ -59,27 +60,46 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error return nil } -// pop removes the element from the queue and returns it. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -// The callback is called before the element is removed from the queue. It must return the size of the element. -func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { - el, ok := <-vcq.ch - if !ok { - return el, false - } +// REMOVE +// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize() +// This is because we want to parallize "reading" from the queue, and we only know the item size +// after reading. We also need to update the queue size in case the batch end up failing. - size := callback(el) +func (vcq *sizedChannel[T]) pop() (T, bool) { + el, ok := <-vcq.ch + return el, ok +} +func (vcq *sizedChannel[T]) updateSize(deltaSize int64) { // The used size and the channel size might be not in sync with the queue in case it's restored from the disk // because we don't flush the current queue size on the disk on every read/write. // In that case we need to make sure it doesn't go below 0. - if vcq.used.Add(-size) < 0 { + if vcq.used.Add(deltaSize) < 0 { vcq.used.Store(0) } - return el, true } +// // pop removes the element from the queue and returns it. +// // The call blocks until there is an item available or the queue is stopped. +// // The function returns true when an item is consumed or false if the queue is stopped and emptied. +// // The callback is called before the element is removed from the queue. It must return the size of the element. +// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +// el, ok := <-vcq.ch +// if !ok { +// return el, false +// } + +// size := callback(el) + +// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk +// // because we don't flush the current queue size on the disk on every read/write. +// // In that case we need to make sure it doesn't go below 0. +// if vcq.used.Add(-size) < 0 { +// vcq.used.Store(0) +// } +// return el, true +// } + // syncSize updates the used size to 0 if the queue is empty. // The caller must ensure that this call is not called concurrently with push. // It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always