Skip to content

Commit

Permalink
Parquet improvements (#1629)
Browse files Browse the repository at this point in the history
* Pass by ref when sorting trace to avoid copying

* Properly alloc slices when converting proto to parquet

* Update parquet lib

* Remove prefetch and adjust pooling to limit parquet compactor mem usage

* Repair test

* Parquet honor max_bytes_per_trace during compaction

* Reduce input blocks from 8 to 4 to reduce memory pressure of parquet when large traces present, more info in compaction complete log

* lint

* Update mod for serverless

* lint

* Parquet buffer tweak, tweak test

* lint
  • Loading branch information
mdisibio authored Aug 11, 2022
1 parent 4e48657 commit 948a6af
Show file tree
Hide file tree
Showing 69 changed files with 2,192 additions and 1,392 deletions.
2 changes: 1 addition & 1 deletion cmd/tempo-serverless/cloud-run/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rs/xid v1.2.1 // indirect
github.com/segmentio/encoding v0.3.5 // indirect
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a // indirect
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d // indirect
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/sony/gobreaker v0.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/tempo-serverless/cloud-run/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1738,8 +1738,8 @@ github.com/segmentio/encoding v0.3.5/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oH
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a h1:i0PfxHlz0A3eAec0nEooLrvmGbaHloNqB20vvnmrHzk=
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a/go.mod h1:BuMbRhCCg3gFchup9zucJaUjQ4m6RxX+iVci37CoMPQ=
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d h1:IxYmTiYCMaR3B0fjD9igXDr1AE5M8KRFbhEhEJ1WYx4=
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d/go.mod h1:BuMbRhCCg3gFchup9zucJaUjQ4m6RxX+iVci37CoMPQ=
github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-serverless/lambda/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rs/xid v1.2.1 // indirect
github.com/segmentio/encoding v0.3.5 // indirect
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a // indirect
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d // indirect
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/sony/gobreaker v0.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/tempo-serverless/lambda/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1740,8 +1740,8 @@ github.com/segmentio/encoding v0.3.5/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oH
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a h1:i0PfxHlz0A3eAec0nEooLrvmGbaHloNqB20vvnmrHzk=
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a/go.mod h1:BuMbRhCCg3gFchup9zucJaUjQ4m6RxX+iVci37CoMPQ=
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d h1:IxYmTiYCMaR3B0fjD9igXDr1AE5M8KRFbhEhEJ1WYx4=
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d/go.mod h1:BuMbRhCCg3gFchup9zucJaUjQ4m6RxX+iVci37CoMPQ=
github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
github.com/prometheus/prometheus v1.8.2-0.20220228151929-e25a59925555
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d
github.com/sirupsen/logrus v1.8.1
github.com/sony/gobreaker v0.4.1
github.com/spf13/viper v1.12.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1933,8 +1933,8 @@ github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e h1:uO75wNGioszj
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a h1:i0PfxHlz0A3eAec0nEooLrvmGbaHloNqB20vvnmrHzk=
github.com/segmentio/parquet-go v0.0.0-20220711225945-6dc5e4bb634a/go.mod h1:BuMbRhCCg3gFchup9zucJaUjQ4m6RxX+iVci37CoMPQ=
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d h1:IxYmTiYCMaR3B0fjD9igXDr1AE5M8KRFbhEhEJ1WYx4=
github.com/segmentio/parquet-go v0.0.0-20220802221544-d84ed320251d/go.mod h1:BuMbRhCCg3gFchup9zucJaUjQ4m6RxX+iVci37CoMPQ=
github.com/sercand/kuberesolver v2.1.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
Expand Down
8 changes: 8 additions & 0 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,19 @@ func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte
return objs[0], wasCombined, nil
}

func (c *Compactor) RecordDiscardedSpans(count int, tenantID string) {
overrides.RecordDiscardedSpans(count, reasonCompactorDiscardedSpans, tenantID)
}

// BlockRetentionForTenant implements CompactorOverrides
func (c *Compactor) BlockRetentionForTenant(tenantID string) time.Duration {
return c.overrides.BlockRetention(tenantID)
}

func (c *Compactor) MaxBytesPerTraceForTenant(tenantID string) int {
return c.overrides.MaxBytesPerTrace(tenantID)
}

func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
}
Expand Down
2 changes: 1 addition & 1 deletion tempodb/compaction_block_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type CompactionBlockSelector interface {
const (
activeWindowDuration = 24 * time.Hour
defaultMinInputBlocks = 2
defaultMaxInputBlocks = 8
defaultMaxInputBlocks = 4
)

/*************************** Time Window Block Selector **************************/
Expand Down
20 changes: 16 additions & 4 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,7 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
}

var err error

defer func() {
level.Info(rw.logger).Log("msg", "compaction complete")
}()
startTime := time.Now()

var totalRecords int
for _, blockMeta := range blockMetas {
Expand Down Expand Up @@ -187,6 +184,7 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
IteratorBufferSize: rw.compactorCfg.IteratorBufferSize,
OutputBlocks: outputBlocks,
Combiner: combiner,
MaxBytesPerTrace: rw.compactorOverrides.MaxBytesPerTraceForTenant(tenantID),
BytesWritten: func(compactionLevel, bytes int) {
metricCompactionBytesWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(bytes))
},
Expand All @@ -196,6 +194,9 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
ObjectsWritten: func(compactionLevel, objs int) {
metricCompactionObjectsWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(objs))
},
SpansDiscarded: func(spans int) {
rw.compactorSharder.RecordDiscardedSpans(spans, tenantID)
},
}

compactor := enc.NewCompactor(opts)
Expand All @@ -210,6 +211,17 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string

metricCompactionBlocks.WithLabelValues(compactionLevelLabel).Add(float64(len(blockMetas)))

logArgs := []interface{}{
"msg",
"compaction complete",
"elapsed",
time.Since(startTime),
}
for _, meta := range newCompactedBlocks {
logArgs = append(logArgs, "block", fmt.Sprintf("%+v", meta))
}
level.Info(rw.logger).Log(logArgs...)

return nil
}

Expand Down
11 changes: 9 additions & 2 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (m *mockSharder) Combine(dataEncoding string, tenantID string, objs ...[]by
return model.StaticCombiner.Combine(dataEncoding, objs...)
}

func (m *mockSharder) RecordDiscardedSpans(count int, tenantID string) {}

type mockCombiner struct {
}

Expand All @@ -56,13 +58,18 @@ type mockJobSharder struct{}
func (m *mockJobSharder) Owns(_ string) bool { return true }

type mockOverrides struct {
blockRetention time.Duration
blockRetention time.Duration
maxBytesPerTrace int
}

func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration {
return m.blockRetention
}

func (m *mockOverrides) MaxBytesPerTraceForTenant(_ string) int {
return m.maxBytesPerTrace
}

func TestCompactionRoundtrip(t *testing.T) {
testEncodings := []string{v2.VersionString, vparquet.VersionString}
for _, enc := range testEncodings {
Expand Down Expand Up @@ -787,7 +794,7 @@ func benchmarkCompaction(b *testing.B, targetBlockVersion string) {
IteratorBufferSize: DefaultIteratorBufferSize,
}, &mockSharder{}, &mockOverrides{})

traceCount := 10_000
traceCount := 20_000
blockCount := 8

// Cut input blocks
Expand Down
2 changes: 2 additions & 0 deletions tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ type CompactionOptions struct {
ChunkSizeBytes uint32
FlushSizeBytes uint32
IteratorBufferSize int // How many traces to prefetch async.
MaxBytesPerTrace int
OutputBlocks uint8
BlockConfig BlockConfig
Combiner model.ObjectCombiner

ObjectsCombined func(compactionLevel, objects int)
ObjectsWritten func(compactionLevel, objects int)
BytesWritten func(compactionLevel, bytes int)
SpansDiscarded func(spans int)
}

type Iterator interface {
Expand Down
16 changes: 8 additions & 8 deletions tempodb/encoding/vparquet/combiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,33 +157,33 @@ func SortTrace(t *Trace) {
for _, b := range t.ResourceSpans {
for _, ils := range b.InstrumentationLibrarySpans {
sort.Slice(ils.Spans, func(i, j int) bool {
return compareSpans(ils.Spans[i], ils.Spans[j])
return compareSpans(&ils.Spans[i], &ils.Spans[j])
})
}
sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool {
return compareIls(b.InstrumentationLibrarySpans[i], b.InstrumentationLibrarySpans[j])
return compareIls(&b.InstrumentationLibrarySpans[i], &b.InstrumentationLibrarySpans[j])
})
}
sort.Slice(t.ResourceSpans, func(i, j int) bool {
return compareBatches(t.ResourceSpans[i], t.ResourceSpans[j])
return compareBatches(&t.ResourceSpans[i], &t.ResourceSpans[j])
})
}

func compareBatches(a, b ResourceSpans) bool {
func compareBatches(a, b *ResourceSpans) bool {
if len(a.InstrumentationLibrarySpans) > 0 && len(b.InstrumentationLibrarySpans) > 0 {
return compareIls(a.InstrumentationLibrarySpans[0], b.InstrumentationLibrarySpans[0])
return compareIls(&a.InstrumentationLibrarySpans[0], &b.InstrumentationLibrarySpans[0])
}
return false
}

func compareIls(a, b ILS) bool {
func compareIls(a, b *ILS) bool {
if len(a.Spans) > 0 && len(b.Spans) > 0 {
return compareSpans(a.Spans[0], b.Spans[0])
return compareSpans(&a.Spans[0], &b.Spans[0])
}
return false
}

func compareSpans(a, b Span) bool {
func compareSpans(a, b *Span) bool {
// Sort by start time, then id
if a.StartUnixNanos == b.StartUnixNanos {
return bytes.Compare(a.ID, b.ID) == -1
Expand Down
51 changes: 51 additions & 0 deletions tempodb/encoding/vparquet/combiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package vparquet
import (
"testing"

"github.com/dustin/go-humanize"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -138,3 +140,52 @@ func TestCombiner(t *testing.T) {
}
}
}

func BenchmarkCombine(b *testing.B) {

batchCount := 100
spanCounts := []int{
100, 1000, 10000,
}

for _, spanCount := range spanCounts {
b.Run("SpanCount:"+humanize.SI(float64(batchCount*spanCount), ""), func(b *testing.B) {
id1 := test.ValidTraceID(nil)
tr1 := traceToParquet(id1, test.MakeTraceWithSpanCount(batchCount, spanCount, id1))

id2 := test.ValidTraceID(nil)
tr2 := traceToParquet(id2, test.MakeTraceWithSpanCount(batchCount, spanCount, id2))

b.ResetTimer()

for i := 0; i < b.N; i++ {
c := NewCombiner()
c.ConsumeWithFinal(&tr1, false)
c.ConsumeWithFinal(&tr2, true)
c.Result()
}
})
}
}

func BenchmarkSortTrace(b *testing.B) {

batchCount := 100
spanCounts := []int{
100, 1000, 10000,
}

for _, spanCount := range spanCounts {
b.Run("SpanCount:"+humanize.SI(float64(batchCount*spanCount), ""), func(b *testing.B) {

id := test.ValidTraceID(nil)
tr := traceToParquet(id, test.MakeTraceWithSpanCount(batchCount, spanCount, id))

b.ResetTimer()

for i := 0; i < b.N; i++ {
SortTrace(&tr)
}
})
}
}
Loading

0 comments on commit 948a6af

Please sign in to comment.