Skip to content

Commit

Permalink
[Ingester] Create one goroutine per tenant to flush traces to disk (#…
Browse files Browse the repository at this point in the history
…4483)

* fix jsonnet example

Signed-off-by: Joe Elliott <[email protected]>

* make one goroutine per instance for wal flush

Signed-off-by: Joe Elliott <[email protected]>

* restore subservice watcher in case it did something

Signed-off-by: Joe Elliott <[email protected]>

* harden shutdown

Signed-off-by: Joe Elliott <[email protected]>

* remove flush event

Signed-off-by: Joe Elliott <[email protected]>

* undo accidental change

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

* remove debug log

Signed-off-by: Joe Elliott <[email protected]>

* Remove broken tests

Signed-off-by: Joe Elliott <[email protected]>

* Revert "fix jsonnet example"

This reverts commit 5e58675.

* update func name for clarity

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Jan 17, 2025
1 parent c4b5e7d commit 1f8d337
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 69 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## main / unreleased

* [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar)
* [CHANGE] **BREAKING CHANGE** Removed `internal_error` as a reason from `tempo_discarded_spans_total`. [#4554](https://github.com/grafana/tempo/pull/4554) (@joe-elliott)
* [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar)
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio)
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4546) (@joe-elliott)
Expand Down
61 changes: 39 additions & 22 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,6 @@ const (
opKindFlush
)

// Flush triggers a flush of all in memory traces to disk. This is called
// by the lifecycler on shutdown and will put our traces in the WAL to be
// replayed.
func (i *Ingester) Flush() {
instances := i.getInstances()

for _, instance := range instances {
err := instance.CutCompleteTraces(0, true)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to cut complete traces on shutdown", "err", err)
}
}
}

// ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order
// * Stop incoming writes by exiting from the ring
// * Flush all blocks to backend
Expand Down Expand Up @@ -124,9 +110,9 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) {
return
}
level.Info(log.Logger).Log("msg", "flushing instance", "instance", instance.instanceID)
i.sweepInstance(instance, true)
i.cutOneInstanceToWal(instance, true)
} else {
i.sweepAllInstances(true)
i.cutAllInstancesToWal()
}

w.WriteHeader(http.StatusNoContent)
Expand All @@ -151,16 +137,47 @@ func (o *flushOp) Priority() int64 {
return -o.at.Unix()
}

// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series
func (i *Ingester) sweepAllInstances(immediate bool) {
// cutToWalLoop kicks off a goroutine for the passed instance that will periodically cut traces to WAL.
// it signals completion through cutToWalWg, waits for cutToWalStart and stops on cutToWalStop.
func (i *Ingester) cutToWalLoop(instance *instance) {
i.cutToWalWg.Add(1)

go func() {
defer i.cutToWalWg.Done()

// wait for the signal to start. we need the wal to be completely replayed
// before we start cutting to WAL
select {
case <-i.cutToWalStart:
case <-i.cutToWalStop:
return
}

// ticker
ticker := time.NewTicker(i.cfg.FlushCheckPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
i.cutOneInstanceToWal(instance, false)
case <-i.cutToWalStop:
return
}
}
}()
}

// cutAllInstancesToWal periodically schedules series for flushing and garbage collects instances with no series
func (i *Ingester) cutAllInstancesToWal() {
instances := i.getInstances()

for _, instance := range instances {
i.sweepInstance(instance, immediate)
i.cutOneInstanceToWal(instance, true)
}
}

func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
func (i *Ingester) cutOneInstanceToWal(instance *instance, immediate bool) {
// cut traces internally
err := instance.CutCompleteTraces(i.cfg.MaxTraceIdle, immediate)
if err != nil {
Expand Down Expand Up @@ -204,6 +221,7 @@ func (i *Ingester) flushLoop(j int) {
if o == nil {
return
}

op := o.(*flushOp)
op.attempts++

Expand Down Expand Up @@ -246,7 +264,7 @@ func handleAbandonedOp(op *flushOp) {
func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, err error) {
ctx, sp := tracer.Start(ctx, "ingester.Complete", trace.WithAttributes(attribute.String("tenant", op.userID), attribute.String("blockID", op.blockID.String())))
defer sp.End()
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", op.userID, "block", op.blockID.String())
withSpan(level.Info(log.Logger), sp).Log("msg", "completing block", "tenant", op.userID, "block", op.blockID.String())

// No point in proceeding if shutdown has been initiated since
// we won't be able to queue up the next flush op
Expand All @@ -256,7 +274,6 @@ func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool,
}

start := time.Now()
level.Info(log.Logger).Log("msg", "completing block", "tenant", op.userID, "blockID", op.blockID)
instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
return false, err
Expand Down
90 changes: 44 additions & 46 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ type Ingester struct {
flushQueues *flushqueues.ExclusiveQueues
flushQueuesDone sync.WaitGroup

limiter Limiter
// manages synchronous behavior with startCutToWal
cutToWalWg sync.WaitGroup
cutToWalStop chan struct{}
cutToWalStart chan struct{}
limiter Limiter

// Used by ingest storage when enabled
ingestPartitionLifecycler *ring.PartitionInstanceLifecycler
Expand All @@ -97,13 +101,16 @@ func New(cfg Config, store storage.Store, overrides overrides.Interface, reg pro
flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength),
replayJitter: true,
overrides: overrides,

cutToWalStart: make(chan struct{}),
cutToWalStop: make(chan struct{}),
}

i.pushErr.Store(ErrStarting)

i.local = store.WAL().LocalBackend()

lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, nil, "ingester", cfg.OverrideRingKey, true, log.Logger, prometheus.WrapRegistererWithPrefix("tempo_", reg))
if err != nil {
return nil, fmt.Errorf("NewLifecycler failed: %w", err)
}
Expand Down Expand Up @@ -145,7 +152,7 @@ func New(cfg Config, store storage.Store, overrides overrides.Interface, reg pro
i.subservicesWatcher = services.NewFailureWatcher()
i.subservicesWatcher.WatchService(i.lifecycler)

i.Service = services.NewBasicService(i.starting, i.loop, i.stopping)
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
return i, nil
}

Expand Down Expand Up @@ -183,49 +190,38 @@ func (i *Ingester) starting(ctx context.Context) error {
}
}

// accept traces
i.pushErr.Store(nil)

// start flushing traces to wal
close(i.cutToWalStart)

return nil
}

func (i *Ingester) loop(ctx context.Context) error {
flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)
defer flushTicker.Stop()

for {
select {
case <-flushTicker.C:
i.sweepAllInstances(false)

case <-ctx.Done():
return nil

case err := <-i.subservicesWatcher.Chan():
return fmt.Errorf("ingester subservice failed: %w", err)
}
}
}

// complete the flushing
// ExclusiveQueues.activekeys keeps track of flush operations due for processing
// ExclusiveQueues.IsEmpty check uses ExclusiveQueues.activeKeys to determine if flushQueues is empty or not
// sweepAllInstances prepares remaining traces to be flushed by flushLoop routine, also updating ExclusiveQueues.activekeys with keys for new flush operations
// ExclusiveQueues.activeKeys is cleared of a flush operation when a processing of flush operation is either successful or doesn't return retry signal
// This ensures that i.flushQueues is empty only when all traces are flushed
func (i *Ingester) flushRemaining() {
i.sweepAllInstances(true)
for !i.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
func (i *Ingester) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-i.subservicesWatcher.Chan():
return fmt.Errorf("ingester subservice failed: %w", err)
}
}

// stopping is run when ingester is asked to stop
func (i *Ingester) stopping(_ error) error {
i.markUnavailable()

// flush any remaining traces
// signal all cutting to wal to stop and wait for all goroutines to finish
close(i.cutToWalStop)
i.cutToWalWg.Wait()

if i.cfg.FlushAllOnShutdown {
// force all in memory traces to be flushed to disk AND fully flush them to the backend
i.flushRemaining()
} else {
// force all in memory traces to be flushed to disk
i.cutAllInstancesToWal()
}

if i.flushQueues != nil {
Expand All @@ -238,6 +234,19 @@ func (i *Ingester) stopping(_ error) error {
return nil
}

// complete the flushing
// ExclusiveQueues.activekeys keeps track of flush operations due for processing
// ExclusiveQueues.IsEmpty check uses ExclusiveQueues.activeKeys to determine if flushQueues is empty or not
// sweepAllInstances prepares remaining traces to be flushed by flushLoop routine, also updating ExclusiveQueues.activekeys with keys for new flush operations
// ExclusiveQueues.activeKeys is cleared of a flush operation when a processing of flush operation is either successful or doesn't return retry signal
// This ensures that i.flushQueues is empty only when all traces are flushed
func (i *Ingester) flushRemaining() {
i.cutAllInstancesToWal()
for !i.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
}
}

func (i *Ingester) markUnavailable() {
// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
Expand All @@ -248,7 +257,7 @@ func (i *Ingester) markUnavailable() {
}

// This will prevent us accepting any more samples
i.stopIncomingRequests()
i.pushErr.Store(ErrShuttingDown)
}

// PushBytes implements tempopb.Pusher.PushBytes. Traces pushed to this endpoint are expected to be in the formats
Expand Down Expand Up @@ -376,6 +385,8 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (*instance, error) {
return nil, err
}
i.instances[instanceID] = inst

i.cutToWalLoop(inst)
}
return inst, nil
}
Expand All @@ -399,19 +410,6 @@ func (i *Ingester) getInstances() []*instance {
return instances
}

// stopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) stopIncomingRequests() {
i.instancesMtx.Lock()
defer i.instancesMtx.Unlock()

i.pushErr.Store(ErrShuttingDown)
}

// TransferOut implements ring.Lifecycler.
func (i *Ingester) TransferOut(context.Context) error {
return ring.ErrTransferDisabled
}

func (i *Ingester) replayWal() error {
level.Info(log.Logger).Log("msg", "beginning wal replay")

Expand Down

0 comments on commit 1f8d337

Please sign in to comment.