Skip to content

Commit

Permalink
Add sync pool enable options.
Browse files Browse the repository at this point in the history
We accept to minimize allocations on requests by sync Pools
  • Loading branch information
maksim.konovalov committed Jan 13, 2025
1 parent 2ea04f8 commit f1ee9fd
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 3 deletions.
48 changes: 45 additions & 3 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -14,6 +15,14 @@ import (
"github.com/vmihailenco/msgpack/v5/msgpcode"
)

// Create a global pool for bytes.Buffer
var responseBufferPool = sync.Pool{
New: func() interface{} {
// The New function creates a new bytes.Buffer instance if the pool is empty
return &bytes.Buffer{}
},
}

// --------------------------------------------------------------------------------
// -- API
// --------------------------------------------------------------------------------
Expand All @@ -36,6 +45,9 @@ type vshardStorageCallResponseProto struct {
AssertError *assertError // not nil if there is assert error
VshardError *StorageCallVShardError // not nil if there is vshard response
CallResp VshardRouterCallResp

EnableResponseSyncPool bool
EnableDecodersSyncPool bool
}

func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -119,7 +131,13 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error
}

// isVShardRespOk is true
buf := bytes.NewBuffer(nil)
var buf *bytes.Buffer

if r.EnableResponseSyncPool {
buf = responseBufferPool.Get().(*bytes.Buffer)
} else {
buf = bytes.NewBuffer(nil)
}

buf.WriteByte(msgpcode.FixedArrayLow | byte(respArrayLen-1))

Expand All @@ -130,6 +148,9 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error

r.CallResp.buf = buf

r.CallResp.enableDecodersSyncPool = r.EnableDecodersSyncPool
r.CallResp.enableResponseSyncPool = r.EnableResponseSyncPool

return nil
}

Expand Down Expand Up @@ -197,7 +218,16 @@ const (

// VshardRouterCallResp represents a response from Router.Call[XXX] methods.
type VshardRouterCallResp struct {
buf *bytes.Buffer
buf *bytes.Buffer
enableDecodersSyncPool bool
enableResponseSyncPool bool
}

func (r VshardRouterCallResp) Close() {
if r.enableResponseSyncPool {
r.buf.Reset()
responseBufferPool.Put(r.buf)
}
}

// Get returns a response from user defined function as []interface{}.
Expand All @@ -210,7 +240,15 @@ func (r VshardRouterCallResp) Get() ([]interface{}, error) {

// GetTyped decodes a response from user defined function into custom values.
func (r VshardRouterCallResp) GetTyped(result interface{}) error {
return msgpack.Unmarshal(r.buf.Bytes(), result)
if !r.enableDecodersSyncPool {
return msgpack.Unmarshal(r.buf.Bytes(), result)
}

decoder := msgpack.GetDecoder()
decoder.Reset(r.buf)
defer msgpack.PutDecoder(decoder)

return decoder.Decode(result)
}

// Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.
Expand Down Expand Up @@ -294,6 +332,10 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
r.log().Infof(ctx, "Try call %s on replicaset %s for bucket %d", fnc, rs.info.Name, bucketID)

var storageCallResponse vshardStorageCallResponseProto

storageCallResponse.EnableResponseSyncPool = r.cfg.EnableResponseSyncPool
storageCallResponse.EnableDecodersSyncPool = r.cfg.EnableDecodersSyncPool

err = rs.conn.Do(tntReq, poolMode).GetTyped(&storageCallResponse)
if err != nil {
return VshardRouterCallResp{}, fmt.Errorf("got error on future.GetTyped(): %w", err)
Expand Down
69 changes: 69 additions & 0 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,3 +467,72 @@ func BenchmarkCallSimpleSelect_GO_Call(b *testing.B) {

b.ReportAllocs()
}

func BenchmarkRouter_Call_Select_SyncPool(b *testing.B) {
b.StopTimer()

ctx := context.Background()

router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
TopologyProvider: static.NewProvider(topology),
DiscoveryTimeout: 5 * time.Second,
DiscoveryMode: vshardrouter.DiscoveryModeOn,
TotalBucketCount: totalBucketCount,
User: username,
EnableResponseSyncPool: true,
EnableDecodersSyncPool: true,
})
require.NoError(b, err)

err = router.ClusterBootstrap(ctx, true)
require.NoError(b, err)

ids := make([]uuid.UUID, b.N)

for i := 0; i < b.N; i++ {
id := uuid.New()
ids[i] = id

bucketID := router.RouterBucketIDStrCRC32(id.String())
resp, err := router.Call(
ctx,
bucketID,
vshardrouter.CallModeRW,
"product_add",
[]interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}},
vshardrouter.CallOpts{},
)
require.NoError(b, err)
resp.Close()
}

type Request struct {
ID string `msgpack:"id"`
}

b.StartTimer()
for i := 0; i < b.N; i++ {
id := ids[i]

bucketID := router.RouterBucketIDStrCRC32(id.String())
resp, err1 := router.Call(
ctx,
bucketID,
vshardrouter.CallModeBRO,
"product_get",
[]interface{}{&Request{ID: id.String()}},
vshardrouter.CallOpts{Timeout: time.Second},
)

var product Product

err2 := resp.GetTyped(&[]interface{}{&product})
resp.Close()
b.StopTimer()
require.NoError(b, err1)
require.NoError(b, err2)
b.StartTimer()
}

b.ReportAllocs()
}
9 changes: 9 additions & 0 deletions vshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ type Config struct {
// that is, our retry timeout if the buckets, for example, move.
// Currently, it only works for sugar implementations .
RequestTimeout time.Duration

// EnableResponseSyncPool determines whether the sync pool for responses should be enabled.
// When enabled, a pool will be used to reuse response objects, improving memory management and performance.
// If this option is enabled, you must call the Close method on the Response object when done using it,
// otherwise, memory leaks may occur due to unreleased resources.
EnableResponseSyncPool bool
// EnableDecodersSyncPool determines whether the sync pool for decoders should be enabled.
// When enabled, a pool will be used to reuse decoder objects, reducing the overhead of creating new decoders.
EnableDecodersSyncPool bool
}

type BucketStatInfo struct {
Expand Down

0 comments on commit f1ee9fd

Please sign in to comment.