From f1ee9fd54b116018ae34de266d1856466a5035d8 Mon Sep 17 00:00:00 2001 From: "maksim.konovalov" Date: Mon, 13 Jan 2025 15:55:06 +0300 Subject: [PATCH] Add sync pool enable options. We accept to minimize allocations on requests by sync Pools --- api.go | 48 ++++++++++++++++++++++++++++++--- tarantool_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++++++ vshard.go | 9 +++++++ 3 files changed, 123 insertions(+), 3 deletions(-) diff --git a/api.go b/api.go index ea9b1e4..b12a9e0 100644 --- a/api.go +++ b/api.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sync" "time" "github.com/google/uuid" @@ -14,6 +15,14 @@ import ( "github.com/vmihailenco/msgpack/v5/msgpcode" ) +// Create a global pool for bytes.Buffer +var responseBufferPool = sync.Pool{ + New: func() interface{} { + // The New function creates a new bytes.Buffer instance if the pool is empty + return &bytes.Buffer{} + }, +} + // -------------------------------------------------------------------------------- // -- API // -------------------------------------------------------------------------------- @@ -36,6 +45,9 @@ type vshardStorageCallResponseProto struct { AssertError *assertError // not nil if there is assert error VshardError *StorageCallVShardError // not nil if there is vshard response CallResp VshardRouterCallResp + + EnableResponseSyncPool bool + EnableDecodersSyncPool bool } func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error { @@ -119,7 +131,13 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error } // isVShardRespOk is true - buf := bytes.NewBuffer(nil) + var buf *bytes.Buffer + + if r.EnableResponseSyncPool { + buf = responseBufferPool.Get().(*bytes.Buffer) + } else { + buf = bytes.NewBuffer(nil) + } buf.WriteByte(msgpcode.FixedArrayLow | byte(respArrayLen-1)) @@ -130,6 +148,9 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error r.CallResp.buf = buf + r.CallResp.enableDecodersSyncPool = r.EnableDecodersSyncPool + r.CallResp.enableResponseSyncPool = r.EnableResponseSyncPool + return nil } @@ -197,7 +218,16 @@ const ( // VshardRouterCallResp represents a response from Router.Call[XXX] methods. type VshardRouterCallResp struct { - buf *bytes.Buffer + buf *bytes.Buffer + enableDecodersSyncPool bool + enableResponseSyncPool bool +} + +func (r VshardRouterCallResp) Close() { + if r.enableResponseSyncPool { + r.buf.Reset() + responseBufferPool.Put(r.buf) + } } // Get returns a response from user defined function as []interface{}. @@ -210,7 +240,15 @@ func (r VshardRouterCallResp) Get() ([]interface{}, error) { // GetTyped decodes a response from user defined function into custom values. func (r VshardRouterCallResp) GetTyped(result interface{}) error { - return msgpack.Unmarshal(r.buf.Bytes(), result) + if !r.enableDecodersSyncPool { + return msgpack.Unmarshal(r.buf.Bytes(), result) + } + + decoder := msgpack.GetDecoder() + decoder.Reset(r.buf) + defer msgpack.PutDecoder(decoder) + + return decoder.Decode(result) } // Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'. @@ -294,6 +332,10 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode, r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID) var storageCallResponse vshardStorageCallResponseProto + + storageCallResponse.EnableResponseSyncPool = r.cfg.EnableResponseSyncPool + storageCallResponse.EnableDecodersSyncPool = r.cfg.EnableDecodersSyncPool + err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse) if err != nil { return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err) diff --git a/tarantool_test.go b/tarantool_test.go index c29dff1..f40e7fc 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -467,3 +467,72 @@ func BenchmarkCallSimpleSelect_GO_Call(b *testing.B) { b.ReportAllocs() } + +func BenchmarkRouter_Call_Select_SyncPool(b *testing.B) { + b.StopTimer() + + ctx := context.Background() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(topology), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: username, + EnableResponseSyncPool: true, + EnableDecodersSyncPool: true, + }) + require.NoError(b, err) + + err = router.ClusterBootstrap(ctx, true) + require.NoError(b, err) + + ids := make([]uuid.UUID, b.N) + + for i := 0; i < b.N; i++ { + id := uuid.New() + ids[i] = id + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + resp, err := router.Call( + ctx, + bucketID, + vshardrouter.CallModeRW, + "product_add", + []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}, + vshardrouter.CallOpts{}, + ) + require.NoError(b, err) + resp.Close() + } + + type Request struct { + ID string `msgpack:"id"` + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + id := ids[i] + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + resp, err1 := router.Call( + ctx, + bucketID, + vshardrouter.CallModeBRO, + "product_get", + []interface{}{&Request{ID: id.String()}}, + vshardrouter.CallOpts{Timeout: time.Second}, + ) + + var product Product + + err2 := resp.GetTyped(&[]interface{}{&product}) + resp.Close() + b.StopTimer() + require.NoError(b, err1) + require.NoError(b, err2) + b.StartTimer() + } + + b.ReportAllocs() +} diff --git a/vshard.go b/vshard.go index d90e581..e437cf7 100644 --- a/vshard.go +++ b/vshard.go @@ -120,6 +120,15 @@ type Config struct { // that is, our retry timeout if the buckets, for example, move. // Currently, it only works for sugar implementations . RequestTimeout time.Duration + + // EnableResponseSyncPool determines whether the sync pool for responses should be enabled. + // When enabled, a pool will be used to reuse response objects, improving memory management and performance. + // If this option is enabled, you must call the Close method on the Response object when done using it, + // otherwise, memory leaks may occur due to unreleased resources. + EnableResponseSyncPool bool + // EnableDecodersSyncPool determines whether the sync pool for decoders should be enabled. + // When enabled, a pool will be used to reuse decoder objects, reducing the overhead of creating new decoders. + EnableDecodersSyncPool bool } type BucketStatInfo struct {