From 1f8d337a856c5bbd945d31b14a34a1fce9c0f6c6 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 17 Jan 2025 09:13:52 -0500 Subject: [PATCH] [Ingester] Create one goroutine per tenant to flush traces to disk (#4483) * fix jsonnet example Signed-off-by: Joe Elliott * make one goroutine per instance for wal flush Signed-off-by: Joe Elliott * restore subservice watcher in case it did something Signed-off-by: Joe Elliott * harden shutdown Signed-off-by: Joe Elliott * remove flush event Signed-off-by: Joe Elliott * undo accidental change Signed-off-by: Joe Elliott * changelog Signed-off-by: Joe Elliott * remove debug log Signed-off-by: Joe Elliott * Remove broken tests Signed-off-by: Joe Elliott * Revert "fix jsonnet example" This reverts commit 5e586751c8b894c45f0e12613e80487f0754adeb. * update func name for clarity Signed-off-by: Joe Elliott --------- Signed-off-by: Joe Elliott --- CHANGELOG.md | 3 +- modules/ingester/flush.go | 61 +++++++++++++++--------- modules/ingester/ingester.go | 90 ++++++++++++++++++------------------ 3 files changed, 85 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1812b01646b..8cf522f3a4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ ## main / unreleased -* [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar) * [CHANGE] **BREAKING CHANGE** Removed `internal_error` as a reason from `tempo_discarded_spans_total`. [#4554](https://github.com/grafana/tempo/pull/4554) (@joe-elliott) +* [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar) +* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott) * [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio) * [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio) * [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4546) (@joe-elliott) diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index 9bb3afd2291..3464ee1a620 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -72,20 +72,6 @@ const ( opKindFlush ) -// Flush triggers a flush of all in memory traces to disk. This is called -// by the lifecycler on shutdown and will put our traces in the WAL to be -// replayed. -func (i *Ingester) Flush() { - instances := i.getInstances() - - for _, instance := range instances { - err := instance.CutCompleteTraces(0, true) - if err != nil { - level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to cut complete traces on shutdown", "err", err) - } - } -} - // ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order // * Stop incoming writes by exiting from the ring // * Flush all blocks to backend @@ -124,9 +110,9 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) { return } level.Info(log.Logger).Log("msg", "flushing instance", "instance", instance.instanceID) - i.sweepInstance(instance, true) + i.cutOneInstanceToWal(instance, true) } else { - i.sweepAllInstances(true) + i.cutAllInstancesToWal() } w.WriteHeader(http.StatusNoContent) @@ -151,16 +137,47 @@ func (o *flushOp) Priority() int64 { return -o.at.Unix() } -// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series -func (i *Ingester) sweepAllInstances(immediate bool) { +// cutToWalLoop kicks off a goroutine for the passed instance that will periodically cut traces to WAL. +// it signals completion through cutToWalWg, waits for cutToWalStart and stops on cutToWalStop. +func (i *Ingester) cutToWalLoop(instance *instance) { + i.cutToWalWg.Add(1) + + go func() { + defer i.cutToWalWg.Done() + + // wait for the signal to start. we need the wal to be completely replayed + // before we start cutting to WAL + select { + case <-i.cutToWalStart: + case <-i.cutToWalStop: + return + } + + // ticker + ticker := time.NewTicker(i.cfg.FlushCheckPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + i.cutOneInstanceToWal(instance, false) + case <-i.cutToWalStop: + return + } + } + }() +} + +// cutAllInstancesToWal periodically schedules series for flushing and garbage collects instances with no series +func (i *Ingester) cutAllInstancesToWal() { instances := i.getInstances() for _, instance := range instances { - i.sweepInstance(instance, immediate) + i.cutOneInstanceToWal(instance, true) } } -func (i *Ingester) sweepInstance(instance *instance, immediate bool) { +func (i *Ingester) cutOneInstanceToWal(instance *instance, immediate bool) { // cut traces internally err := instance.CutCompleteTraces(i.cfg.MaxTraceIdle, immediate) if err != nil { @@ -204,6 +221,7 @@ func (i *Ingester) flushLoop(j int) { if o == nil { return } + op := o.(*flushOp) op.attempts++ @@ -246,7 +264,7 @@ func handleAbandonedOp(op *flushOp) { func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, err error) { ctx, sp := tracer.Start(ctx, "ingester.Complete", trace.WithAttributes(attribute.String("tenant", op.userID), attribute.String("blockID", op.blockID.String()))) defer sp.End() - withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", op.userID, "block", op.blockID.String()) + withSpan(level.Info(log.Logger), sp).Log("msg", "completing block", "tenant", op.userID, "block", op.blockID.String()) // No point in proceeding if shutdown has been initiated since // we won't be able to queue up the next flush op @@ -256,7 +274,6 @@ func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, } start := time.Now() - level.Info(log.Logger).Log("msg", "completing block", "tenant", op.userID, "blockID", op.blockID) instance, err := i.getOrCreateInstance(op.userID) if err != nil { return false, err diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index eac6c8c378d..fc6c722f727 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -77,7 +77,11 @@ type Ingester struct { flushQueues *flushqueues.ExclusiveQueues flushQueuesDone sync.WaitGroup - limiter Limiter + // manages synchronous behavior with startCutToWal + cutToWalWg sync.WaitGroup + cutToWalStop chan struct{} + cutToWalStart chan struct{} + limiter Limiter // Used by ingest storage when enabled ingestPartitionLifecycler *ring.PartitionInstanceLifecycler @@ -97,13 +101,16 @@ func New(cfg Config, store storage.Store, overrides overrides.Interface, reg pro flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength), replayJitter: true, overrides: overrides, + + cutToWalStart: make(chan struct{}), + cutToWalStop: make(chan struct{}), } i.pushErr.Store(ErrStarting) i.local = store.WAL().LocalBackend() - lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg)) + lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, nil, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg)) if err != nil { return nil, fmt.Errorf("NewLifecycler failed: %w", err) } @@ -145,7 +152,7 @@ func New(cfg Config, store storage.Store, overrides overrides.Interface, reg pro i.subservicesWatcher = services.NewFailureWatcher() i.subservicesWatcher.WatchService(i.lifecycler) - i.Service = services.NewBasicService(i.starting, i.loop, i.stopping) + i.Service = services.NewBasicService(i.starting, i.running, i.stopping) return i, nil } @@ -183,39 +190,21 @@ func (i *Ingester) starting(ctx context.Context) error { } } + // accept traces i.pushErr.Store(nil) + // start flushing traces to wal + close(i.cutToWalStart) + return nil } -func (i *Ingester) loop(ctx context.Context) error { - flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod) - defer flushTicker.Stop() - - for { - select { - case <-flushTicker.C: - i.sweepAllInstances(false) - - case <-ctx.Done(): - return nil - - case err := <-i.subservicesWatcher.Chan(): - return fmt.Errorf("ingester subservice failed: %w", err) - } - } -} - -// complete the flushing -// ExclusiveQueues.activekeys keeps track of flush operations due for processing -// ExclusiveQueues.IsEmpty check uses ExclusiveQueues.activeKeys to determine if flushQueues is empty or not -// sweepAllInstances prepares remaining traces to be flushed by flushLoop routine, also updating ExclusiveQueues.activekeys with keys for new flush operations -// ExclusiveQueues.activeKeys is cleared of a flush operation when a processing of flush operation is either successful or doesn't return retry signal -// This ensures that i.flushQueues is empty only when all traces are flushed -func (i *Ingester) flushRemaining() { - i.sweepAllInstances(true) - for !i.flushQueues.IsEmpty() { - time.Sleep(100 * time.Millisecond) +func (i *Ingester) running(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + case err := <-i.subservicesWatcher.Chan(): + return fmt.Errorf("ingester subservice failed: %w", err) } } @@ -223,9 +212,16 @@ func (i *Ingester) flushRemaining() { func (i *Ingester) stopping(_ error) error { i.markUnavailable() - // flush any remaining traces + // signal all cutting to wal to stop and wait for all goroutines to finish + close(i.cutToWalStop) + i.cutToWalWg.Wait() + if i.cfg.FlushAllOnShutdown { + // force all in memory traces to be flushed to disk AND fully flush them to the backend i.flushRemaining() + } else { + // force all in memory traces to be flushed to disk + i.cutAllInstancesToWal() } if i.flushQueues != nil { @@ -238,6 +234,19 @@ func (i *Ingester) stopping(_ error) error { return nil } +// complete the flushing +// ExclusiveQueues.activekeys keeps track of flush operations due for processing +// ExclusiveQueues.IsEmpty check uses ExclusiveQueues.activeKeys to determine if flushQueues is empty or not +// sweepAllInstances prepares remaining traces to be flushed by flushLoop routine, also updating ExclusiveQueues.activekeys with keys for new flush operations +// ExclusiveQueues.activeKeys is cleared of a flush operation when a processing of flush operation is either successful or doesn't return retry signal +// This ensures that i.flushQueues is empty only when all traces are flushed +func (i *Ingester) flushRemaining() { + i.cutAllInstancesToWal() + for !i.flushQueues.IsEmpty() { + time.Sleep(100 * time.Millisecond) + } +} + func (i *Ingester) markUnavailable() { // Lifecycler can be nil if the ingester is for a flusher. if i.lifecycler != nil { @@ -248,7 +257,7 @@ func (i *Ingester) markUnavailable() { } // This will prevent us accepting any more samples - i.stopIncomingRequests() + i.pushErr.Store(ErrShuttingDown) } // PushBytes implements tempopb.Pusher.PushBytes. Traces pushed to this endpoint are expected to be in the formats @@ -376,6 +385,8 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (*instance, error) { return nil, err } i.instances[instanceID] = inst + + i.cutToWalLoop(inst) } return inst, nil } @@ -399,19 +410,6 @@ func (i *Ingester) getInstances() []*instance { return instances } -// stopIncomingRequests implements ring.Lifecycler. -func (i *Ingester) stopIncomingRequests() { - i.instancesMtx.Lock() - defer i.instancesMtx.Unlock() - - i.pushErr.Store(ErrShuttingDown) -} - -// TransferOut implements ring.Lifecycler. -func (i *Ingester) TransferOut(context.Context) error { - return ring.ErrTransferDisabled -} - func (i *Ingester) replayWal() error { level.Info(log.Logger).Log("msg", "beginning wal replay")