Skip to content

Commit

Permalink
distributor: prevent panics when concurrently calling to forwarder's… (
Browse files Browse the repository at this point in the history
…#1422)

* distributor: prevent panics when concurrently calling  to forwarder's queueManager

* Update PR number

* Pin k6 version to v0.37.0
  • Loading branch information
mapno committed May 5, 2022
1 parent 013a0db commit d3880a9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [BUGFIX] metrics-generator: don't inject X-Scope-OrgID header for single-tenant setups [1417](https://github.com/grafana/tempo/pull/1417) (@kvrhdn)
* [BUGFIX] compactor: populate `compaction_objects_combined_total` and `tempo_discarded_spans_total{reason="trace_too_large_to_compact"}` metrics again [1420](https://github.com/grafana/tempo/pull/1420) (@mdisibio)
* [BUGFIX] distributor: prevent panics when concurrently calling `shutdown` to forwarder's queueManager [1422](https://github.com/grafana/tempo/pull/1422) (@mapno)

## v1.4.0 / 2022-04-28

Expand Down
2 changes: 1 addition & 1 deletion integration/bench/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
k6Image = "loadimpact/k6:latest"
k6Image = "loadimpact/k6:0.37.0"
)

func TestAllInOne(t *testing.T) {
Expand Down
16 changes: 7 additions & 9 deletions modules/distributor/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,15 @@ func (m *queueManager) forwardRequest(ctx context.Context, req *request) {
}

func (m *queueManager) shutdown() error {
// Already being shutdown
if m.readOnly.Load() {
return nil
}
// Call to stopWorkers only once
if m.readOnly.CAS(false, true) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

m.readOnly.Store(true)
return m.stopWorkers(ctx)
}

return m.stopWorkers(ctx)
return nil
}

func (m *queueManager) stopWorkers(ctx context.Context) error {
Expand Down

0 comments on commit d3880a9

Please sign in to comment.