From e0352448dd3f41c42984197a81d28cca267be7ae Mon Sep 17 00:00:00 2001 From: "maksim.konovalov" Date: Fri, 10 Jan 2025 21:00:37 +0300 Subject: [PATCH] Reworked attributes, decoding logic, removed deprecated methods, and added custom decoding support. In this fix we have deeply reworked many attributes. Interfaces and decoding logic have been redesigned. Some deprecated methods have been removed. Some tests have been moved to a valid location. Decoding now works with custom decoding logic. --- CHANGELOG.md | 16 ++ README.md | 25 +- README_ru.md | 180 +++++++------- api.go | 184 +++++--------- api_test.go | 46 ---- config.lua | 39 +++ examples/customer/go-service/main.go | 26 +- go.mod | 2 +- go.sum | 5 +- replicaset.go | 33 --- sugar.go | 25 +- tarantool_test.go | 336 +++++++++++++++++++++++--- tests/tnt/call_bench_test.go | 206 +--------------- tests/tnt/concurrent_topology_test.go | 6 +- tests/tnt/replicaset_test.go | 84 ------- tests/tnt/router_call_test.go | 113 --------- tests/tnt/tnt_test.go | 15 -- tests/tnt/topology_test.go | 58 ----- 18 files changed, 551 insertions(+), 848 deletions(-) delete mode 100644 tests/tnt/router_call_test.go delete mode 100644 tests/tnt/topology_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 03de5e9..baa5682 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,24 @@ ## Unreleased CHANGES: + * Instance UUID no more required, use instance name instead. * Removed toolchain go1.23.3. +* Refactored GetTyped interface and logic. Now we use raw msg buffer instead raw messages. Interface works and looks + like go-tarantool response. +* ReplicaCall, RouterCallImpl methods was removed cause it works invalid and looks useless. +* All PR, issue references in #XYZ format in commits older than 42f363775dfb9eaf7ec2a6ed7a999847752cec00 refer to https://github.com/KaymeKaydex/go-vshard-router. +* VshardRouterCallMode type renamed to CallMode for simplicity. +* StorageResultTypedFunc type removed as useless type. +* Updated msgpack version from v5.3.5 to v5.4.1. + +TESTS: + +* Write tests in tests/tnt folder are deprecated. +* Removed empty todo tests from tests/tnt. +* Moved TestReplicasetReplicaCall and Go benches from tests/tnt to tarantool_test.go . +* TestRouterCallProto rewrote. +* Start using constants in tarantool_test.go instead duplicate variables. ## v1.3.2 diff --git a/README.md b/README.md index 8842aad..172b14f 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,6 @@ import ( "github.com/google/uuid" "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-tarantool/v2/pool" ) func main() { @@ -115,11 +114,11 @@ func main() { }: { { Addr: "127.0.0.1:1001", - UUID: uuid.New(), + Name: "1_1", }, { Addr: "127.0.0.1:1002", - UUID: uuid.New(), + Name: "1_2", }, }, vshardrouter.ReplicasetInfo{ @@ -128,11 +127,11 @@ func main() { }: { { Addr: "127.0.0.1:2001", - UUID: uuid.New(), + Name: "2_1", }, { Addr: "127.0.0.1:2002", - UUID: uuid.New(), + Name: "2_2", }, }, }), @@ -153,10 +152,10 @@ func main() { bucketID := vshardrouter.BucketIDStrCRC32(strconv.FormatUint(user.ID, 10), directRouter.RouterBucketCount()) - interfaceResult, getTyped, err := directRouter.RouterCallImpl( + resp, err := directRouter.Call( ctx, bucketID, - vshardrouter.CallOpts{VshardMode: vshardrouter.ReadMode, PoolMode: pool.PreferRO, Timeout: time.Second * 2}, + vshardrouter.CallModeBRO, "storage.api.get_user_info", []interface{}{&struct { BucketID uint64 `msgpack:"bucket_id" json:"bucket_id,omitempty"` @@ -166,14 +165,22 @@ func main() { Body: map[string]interface{}{ "user_id": "123456", }, - }}, + }}, vshardrouter.CallOpts{Timeout: time.Second * 2}, ) + if err != nil { + panic(err) + } info := &struct { BirthDay int }{} - err = getTyped(&[]interface{}{info}) + err = resp.GetTyped(&[]interface{}{info}) + if err != nil { + panic(err) + } + + interfaceResult, err := resp.Get() if err != nil { panic(err) } diff --git a/README_ru.md b/README_ru.md index 7c20064..55961b3 100644 --- a/README_ru.md +++ b/README_ru.md @@ -88,99 +88,105 @@ $ go get -u github.com/tarantool/go-vshard-router package main import ( - "context" - "fmt" - "strconv" - "time" + "context" + "fmt" + "strconv" + "time" - vshardrouter "github.com/tarantool/go-vshard-router" - "github.com/tarantool/go-vshard-router/providers/static" + vshardrouter "github.com/tarantool/go-vshard-router" + "github.com/tarantool/go-vshard-router/providers/static" - "github.com/google/uuid" - "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-tarantool/v2/pool" + "github.com/google/uuid" + "github.com/tarantool/go-tarantool/v2" ) func main() { - ctx := context.Background() - - directRouter, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - DiscoveryTimeout: time.Minute, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TopologyProvider: static.NewProvider(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{ - vshardrouter.ReplicasetInfo{ - Name: "replcaset_1", - UUID: uuid.New(), - }: { - { - Addr: "127.0.0.1:1001", - UUID: uuid.New(), - }, - { - Addr: "127.0.0.1:1002", - UUID: uuid.New(), - }, - }, - vshardrouter.ReplicasetInfo{ - Name: "replcaset_2", - UUID: uuid.New(), - }: { - { - Addr: "127.0.0.1:2001", - UUID: uuid.New(), - }, - { - Addr: "127.0.0.1:2002", - UUID: uuid.New(), - }, - }, - }), - TotalBucketCount: 128000, - PoolOpts: tarantool.Opts{ - Timeout: time.Second, - }, - }) - if err != nil { - panic(err) - } - - user := struct { - ID uint64 - }{ - ID: 123, - } - - bucketID := vshardrouter.BucketIDStrCRC32(strconv.FormatUint(user.ID, 10), directRouter.RouterBucketCount()) - - interfaceResult, getTyped, err := directRouter.RouterCallImpl( - ctx, - bucketID, - vshardrouter.CallOpts{VshardMode: vshardrouter.ReadMode, PoolMode: pool.PreferRO, Timeout: time.Second * 2}, - "storage.api.get_user_info", - []interface{}{&struct { - BucketID uint64 `msgpack:"bucket_id" json:"bucket_id,omitempty"` - Body map[string]interface{} `msgpack:"body"` - }{ - BucketID: bucketID, - Body: map[string]interface{}{ - "user_id": "123456", - }, - }}, - ) - - info := &struct { - BirthDay int - }{} - - err = getTyped(&[]interface{}{info}) - if err != nil { - panic(err) - } - - fmt.Printf("interface result: %v", interfaceResult) - fmt.Printf("get typed result: %v", info) + ctx := context.Background() + + directRouter, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + DiscoveryTimeout: time.Minute, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TopologyProvider: static.NewProvider(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{ + vshardrouter.ReplicasetInfo{ + Name: "replcaset_1", + UUID: uuid.New(), + }: { + { + Addr: "127.0.0.1:1001", + Name: "1_1", + }, + { + Addr: "127.0.0.1:1002", + Name: "1_2", + }, + }, + vshardrouter.ReplicasetInfo{ + Name: "replcaset_2", + UUID: uuid.New(), + }: { + { + Addr: "127.0.0.1:2001", + Name: "2_1", + }, + { + Addr: "127.0.0.1:2002", + Name: "2_2", + }, + }, + }), + TotalBucketCount: 128000, + PoolOpts: tarantool.Opts{ + Timeout: time.Second, + }, + }) + if err != nil { + panic(err) + } + + user := struct { + ID uint64 + }{ + ID: 123, + } + + bucketID := vshardrouter.BucketIDStrCRC32(strconv.FormatUint(user.ID, 10), directRouter.RouterBucketCount()) + + resp, err := directRouter.Call( + ctx, + bucketID, + vshardrouter.CallModeBRO, + "storage.api.get_user_info", + []interface{}{&struct { + BucketID uint64 `msgpack:"bucket_id" json:"bucket_id,omitempty"` + Body map[string]interface{} `msgpack:"body"` + }{ + BucketID: bucketID, + Body: map[string]interface{}{ + "user_id": "123456", + }, + }}, vshardrouter.CallOpts{Timeout: time.Second * 2}, + ) + if err != nil { + panic(err) + } + + info := &struct { + BirthDay int + }{} + + err = resp.GetTyped(&[]interface{}{info}) + if err != nil { + panic(err) + } + + interfaceResult, err := resp.Get() + if err != nil { + panic(err) + } + + fmt.Printf("interface result: %v", interfaceResult) + fmt.Printf("get typed result: %v", info) } - ``` ### Провайдеры diff --git a/api.go b/api.go index 712d7c7..ea9b1e4 100644 --- a/api.go +++ b/api.go @@ -1,6 +1,7 @@ package vshard_router //nolint:revive import ( + "bytes" "context" "fmt" "time" @@ -118,15 +119,17 @@ func (r *vshardStorageCallResponseProto) DecodeMsgpack(d *msgpack.Decoder) error } // isVShardRespOk is true - r.CallResp.rawMessages = make([]msgpack.RawMessage, 0, respArrayLen-1) - for i := 1; i < respArrayLen; i++ { - elem, err := d.DecodeRaw() - if err != nil { - return fmt.Errorf("failed to decode into msgpack.RawMessage element #%d of response array", i-1) - } - r.CallResp.rawMessages = append(r.CallResp.rawMessages, elem) + buf := bytes.NewBuffer(nil) + + buf.WriteByte(msgpcode.FixedArrayLow | byte(respArrayLen-1)) + + _, err = buf.ReadFrom(d.Buffered()) + if err != nil { + return err } + r.CallResp.buf = buf + return nil } @@ -169,122 +172,50 @@ func (s StorageCallVShardError) Error() string { return fmt.Sprintf("%+v", alias(s)) } -type StorageResultTypedFunc = func(result ...interface{}) error - type CallOpts struct { - VshardMode VshardMode // vshard mode in call - PoolMode pool.Mode - Timeout time.Duration + Timeout time.Duration } -// VshardRouterCallMode is a type to represent call mode for Router.Call method. -type VshardRouterCallMode int +// CallMode is a type to represent call mode for Router.Call method. +type CallMode int const ( - // VshardRouterCallModeRO sets a read-only mode for Router.Call. - VshardRouterCallModeRO VshardRouterCallMode = iota - // VshardRouterCallModeRW sets a read-write mode for Router.Call. - VshardRouterCallModeRW - // VshardRouterCallModeRE acts like VshardRouterCallModeRO + // CallModeRO sets a read-only mode for Router.Call. + CallModeRO CallMode = iota + // CallModeRW sets a read-write mode for Router.Call. + CallModeRW + // CallModeRE acts like CallModeRO // with preference for a replica rather than a master. // This mode is not supported yet. - VshardRouterCallModeRE - // VshardRouterCallModeBRO acts like VshardRouterCallModeRO with balancing. - VshardRouterCallModeBRO - // VshardRouterCallModeBRE acts like VshardRouterCallModeRO with balancing + CallModeRE + // CallModeBRO acts like CallModeRO with balancing. + CallModeBRO + // CallModeBRE acts like CallModeRO with balancing // and preference for a replica rather than a master. - VshardRouterCallModeBRE + CallModeBRE ) -// VshardRouterCallOptions represents options to Router.Call[XXX] methods. -type VshardRouterCallOptions struct { - Timeout time.Duration -} - // VshardRouterCallResp represents a response from Router.Call[XXX] methods. type VshardRouterCallResp struct { - rawMessages []msgpack.RawMessage + buf *bytes.Buffer } // Get returns a response from user defined function as []interface{}. func (r VshardRouterCallResp) Get() ([]interface{}, error) { - resp := make([]interface{}, len(r.rawMessages)) - return resp, r.GetTyped(resp) -} - -// GetTyped decodes a response from user defined function into custom values. -func (r VshardRouterCallResp) GetTyped(result []interface{}) error { - minLen := len(result) - if dataLen := len(r.rawMessages); dataLen < minLen { - minLen = dataLen - } + var result []interface{} + err := r.GetTyped(&result) - for i := 0; i < minLen; i++ { - if err := msgpack.Unmarshal(r.rawMessages[i], &result[i]); err != nil { - return fmt.Errorf("failed to decode into result[%d] element #%d of response array: %w", i, i, err) - } - } - - return nil + return result, err } -// RouterCallImpl Perform shard operation function will restart operation -// after wrong bucket response until timeout is reached -// Deprecated: RouterCallImpl is deprecated. -// See https://github.com/tarantool/go-vshard-router/issues/110. -// Use Call method with RO, RW, RE, BRO, BRE modes instead. -func (r *Router) RouterCallImpl(ctx context.Context, - bucketID uint64, - opts CallOpts, - fnc string, - args interface{}) (interface{}, StorageResultTypedFunc, error) { - - var vshardCallOpts = VshardRouterCallOptions{ - Timeout: opts.Timeout, - } - - var vshardCallMode VshardRouterCallMode - - switch opts.VshardMode { - case WriteMode: - vshardCallMode = VshardRouterCallModeRW - case ReadMode: - switch opts.PoolMode { - case pool.ANY: - vshardCallMode = VshardRouterCallModeBRO - case pool.RO: - vshardCallMode = VshardRouterCallModeRO - case pool.RW: - return nil, nil, fmt.Errorf("unexpected opts %+v", opts) - case pool.PreferRO: - vshardCallMode = VshardRouterCallModeBRE - case pool.PreferRW: - return nil, nil, fmt.Errorf("unexpected opts %+v", opts) - default: - return nil, nil, fmt.Errorf("unexpected opts.PoolMode %v", opts.PoolMode) - } - default: - return nil, nil, fmt.Errorf("unexpected opts.VshardMode %v", opts.VshardMode) - } - - vshardCallResp, err := r.Call(ctx, bucketID, vshardCallMode, fnc, args, vshardCallOpts) - if err != nil { - return nil, nil, err - } - - data, err := vshardCallResp.Get() - if err != nil { - return nil, nil, err - } - - return data, func(result ...interface{}) error { - return vshardCallResp.GetTyped(result) - }, nil +// GetTyped decodes a response from user defined function into custom values. +func (r VshardRouterCallResp) GetTyped(result interface{}) error { + return msgpack.Unmarshal(r.buf.Bytes(), result) } // Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'. -func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardRouterCallMode, - fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { +func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode, + fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) { const vshardStorageClientCall = "vshard.storage.call" if bucketID < 1 || r.cfg.TotalBucketCount < bucketID { @@ -295,18 +226,18 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardRouterCal var vshardMode VshardMode switch mode { - case VshardRouterCallModeRO: + case CallModeRO: poolMode, vshardMode = pool.RO, ReadMode - case VshardRouterCallModeRW: + case CallModeRW: poolMode, vshardMode = pool.RW, WriteMode - case VshardRouterCallModeRE: + case CallModeRE: // poolMode, vshardMode = pool.PreferRO, ReadMode // since go-tarantool always use balance=true politic, // we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400 return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet") - case VshardRouterCallModeBRO: + case CallModeBRO: poolMode, vshardMode = pool.ANY, ReadMode - case VshardRouterCallModeBRE: + case CallModeBRE: poolMode, vshardMode = pool.PreferRO, ReadMode default: return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode) @@ -447,34 +378,34 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode VshardRouterCal } } -// CallRO is an alias for Call with VshardRouterCallModeRO. +// CallRO is an alias for Call with CallModeRO. func (r *Router) CallRO(ctx context.Context, bucketID uint64, - fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { - return r.Call(ctx, bucketID, VshardRouterCallModeRO, fnc, args, opts) + fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, CallModeRO, fnc, args, opts) } -// CallRW is an alias for Call with VshardRouterCallModeRW. +// CallRW is an alias for Call with CallModeRW. func (r *Router) CallRW(ctx context.Context, bucketID uint64, - fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { - return r.Call(ctx, bucketID, VshardRouterCallModeRW, fnc, args, opts) + fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, CallModeRW, fnc, args, opts) } -// CallRE is an alias for Call with VshardRouterCallModeRE. +// CallRE is an alias for Call with CallModeRE. func (r *Router) CallRE(ctx context.Context, bucketID uint64, - fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { - return r.Call(ctx, bucketID, VshardRouterCallModeRE, fnc, args, opts) + fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, CallModeRE, fnc, args, opts) } -// CallBRO is an alias for Call with VshardRouterCallModeBRO. +// CallBRO is an alias for Call with CallModeBRO. func (r *Router) CallBRO(ctx context.Context, bucketID uint64, - fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { - return r.Call(ctx, bucketID, VshardRouterCallModeBRO, fnc, args, opts) + fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, CallModeBRO, fnc, args, opts) } -// CallBRE is an alias for Call with VshardRouterCallModeBRE. +// CallBRE is an alias for Call with CallModeBRE. func (r *Router) CallBRE(ctx context.Context, bucketID uint64, - fnc string, args interface{}, opts VshardRouterCallOptions) (VshardRouterCallResp, error) { - return r.Call(ctx, bucketID, VshardRouterCallModeBRE, fnc, args, opts) + fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error) { + return r.Call(ctx, bucketID, CallModeBRE, fnc, args, opts) } // RouterMapCallRWOptions sets options for RouterMapCallRW. @@ -613,9 +544,15 @@ func (r *Router) RouterMapCallRWImpl( args interface{}, opts CallOpts, ) (map[uuid.UUID]interface{}, error) { + // nolint:gosimple return RouterMapCallRW[interface{}](r, ctx, fnc, args, RouterMapCallRWOptions{Timeout: opts.Timeout}) } +type replicasetFuture struct { + uuid uuid.UUID + future *tarantool.Future +} + // RouterMapCallRW is a consistent Map-Reduce. The given function is called on all masters in the // cluster with a guarantee that in case of success it was executed with all // buckets being accessible for reads and writes. @@ -657,11 +594,6 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context, Context(ctx). Args([]interface{}{"storage_ref", refID, timeout}) - type replicasetFuture struct { - uuid uuid.UUID - future *tarantool.Future - } - var rsFutures = make([]replicasetFuture, 0, len(idToReplicasetRef)) // ref stage: send concurrent ref requests diff --git a/api_test.go b/api_test.go index e8eb1ee..61cfea0 100644 --- a/api_test.go +++ b/api_test.go @@ -1,16 +1,9 @@ package vshard_router // nolint: revive import ( - "context" - "fmt" - "sync/atomic" "testing" - "time" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/tarantool/go-tarantool/v2" - mockpool "github.com/tarantool/go-vshard-router/mocks/pool" ) var emptyRouter = &Router{ @@ -32,42 +25,3 @@ func TestRouter_RouterRouteAll(t *testing.T) { m := emptyRouter.RouterRouteAll() require.Empty(t, m) } - -func TestRouter_RouterCallImpl(t *testing.T) { - t.Parallel() - ctx := context.TODO() - - t.Run("bucket id is out of range", func(t *testing.T) { - t.Parallel() - - _, _, err := emptyRouter.RouterCallImpl(ctx, 100, CallOpts{}, "test", []byte("test")) - require.Errorf(t, err, "bucket id is out of range") - }) - t.Run("future error when router call impl", func(t *testing.T) { - t.Parallel() - r := &Router{ - cfg: Config{ - TotalBucketCount: uint64(10), - Loggerf: emptyLogfProvider, - Metrics: emptyMetricsProvider, - }, - view: &consistentView{ - routeMap: make([]atomic.Pointer[Replicaset], 11), - }, - } - - futureError := fmt.Errorf("testErr") - errFuture := tarantool.NewFuture(tarantool.NewCallRequest("test")) - errFuture.SetError(futureError) - - mPool := mockpool.NewPool(t) - mPool.On("Do", mock.Anything, mock.Anything).Return(errFuture) - - r.view.routeMap[5].Store(&Replicaset{ - conn: mPool, - }) - - _, _, err := r.RouterCallImpl(ctx, 5, CallOpts{Timeout: time.Second, VshardMode: ReadMode}, "test", []byte("test")) - require.ErrorIs(t, err, futureError) - }) -} diff --git a/config.lua b/config.lua index 99f672d..e760d71 100644 --- a/config.lua +++ b/config.lua @@ -2,6 +2,8 @@ require('strict').on() +local uuid = require('uuid') + -- Get instance name local NAME = os.getenv("TEST_TNT_WORK_DIR") local fiber = require('fiber') @@ -69,6 +71,8 @@ box.once('access:v1', function() box.schema.user.grant('guest', 'read,write,execute', 'universe') end) +-- everything below is copypasted from storage.lua in vshard example: +-- https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/example/storage.lua box.once("testapp:schema:1", function() local customer = box.schema.space.create('customer') customer:format({ @@ -79,6 +83,17 @@ box.once("testapp:schema:1", function() customer:create_index('customer_id', {parts = {'customer_id'}}) customer:create_index('bucket_id', {parts = {'bucket_id'}, unique = false}) + -- create products for easy bench + local products = box.schema.space.create('products') + products:format({ + {'id', 'uuid'}, + {'bucket_id', 'unsigned'}, + {'name', 'string'}, + {'count', 'unsigned'}, + }) + products:create_index('id', {parts = {'id'}}) + + local account = box.schema.space.create('account') account:format({ {'account_id', 'unsigned'}, @@ -104,8 +119,13 @@ box.once("testapp:schema:1", function() box.schema.role.grant('public', 'execute', 'function', 'raise_luajit_error') box.schema.func.create('raise_client_error') box.schema.role.grant('public', 'execute', 'function', 'raise_client_error') + + box.schema.user.grant('storage', 'super') + box.schema.user.create('tarantool') + box.schema.user.grant('tarantool', 'super') end) + local function insert_customer(customer) box.space.customer:insert({customer.customer_id, customer.bucket_id, customer.name}) for _, account in ipairs(customer.accounts) do @@ -166,3 +186,22 @@ end function raise_client_error() box.error(box.error.UNKNOWN) end + +-- product_add - simple add some product to storage +function product_add(product) + local id = uuid.fromstr(product.id) + + box.space.products:insert({ id, product.bucket_id, product.name, product.count}) + + return true +end + +-- product_get - simple select for benches +function product_get(req) + local product = box.space.products:get(uuid.fromstr(req.id)) + + return { + name = product.name, + id = product.id:str() + } +end diff --git a/examples/customer/go-service/main.go b/examples/customer/go-service/main.go index 2fab5b9..0cb49c4 100644 --- a/examples/customer/go-service/main.go +++ b/examples/customer/go-service/main.go @@ -13,8 +13,6 @@ import ( "github.com/google/uuid" "github.com/spf13/viper" "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-tarantool/v2/pool" - vshardrouter "github.com/tarantool/go-vshard-router" "github.com/tarantool/go-vshard-router/providers/static" ) @@ -121,17 +119,15 @@ func (c *controller) CustomerAddHandler(w http.ResponseWriter, r *http.Request) req.BucketId = bucketID - faces, _, err := c.router.RouterCallImpl(ctx, bucketID, vshardrouter.CallOpts{ - VshardMode: vshardrouter.WriteMode, - PoolMode: pool.RW, - Timeout: time.Minute, - }, "customer_add", []interface{}{req}) + resp, err := c.router.Call(ctx, bucketID, vshardrouter.CallModeRW, "customer_add", []interface{}{req}, vshardrouter.CallOpts{ + Timeout: time.Minute, + }) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } - fmt.Println(faces) + fmt.Println(resp.Get()) } // customer lookup @@ -169,21 +165,19 @@ func (c *controller) CustomerLookupHandler(w http.ResponseWriter, r *http.Reques } bucketID := c.router.RouterBucketIDStrCRC32(customerID) - faces, typedFnc, err := c.router.RouterCallImpl(ctx, bucketID, vshardrouter.CallOpts{ - VshardMode: vshardrouter.ReadMode, - PoolMode: pool.PreferRO, - Timeout: time.Minute, - }, "customer_lookup", []interface{}{csID}) + resp, err := c.router.Call(ctx, bucketID, vshardrouter.CallModeBRO, "customer_lookup", []interface{}{csID}, vshardrouter.CallOpts{ + Timeout: time.Minute, + }) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - log.Println(faces) + log.Println(resp.Get()) - resp := &CustomerLookupResponse{} + rsp := &CustomerLookupResponse{} - err = typedFnc(resp) + err = resp.GetTyped(rsp) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/go.mod b/go.mod index 61d166d..9a13892 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.10.0 github.com/tarantool/go-tarantool/v2 v2.2.1 - github.com/vmihailenco/msgpack/v5 v5.3.5 + github.com/vmihailenco/msgpack/v5 v5.4.1 go.etcd.io/etcd/client/v2 v2.305.17 go.etcd.io/etcd/client/v3 v3.5.17 go.etcd.io/etcd/server/v3 v3.5.17 diff --git a/go.sum b/go.sum index b49efc5..aa24e4a 100644 --- a/go.sum +++ b/go.sum @@ -335,7 +335,6 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= @@ -354,8 +353,8 @@ github.com/tarantool/go-tarantool/v2 v2.2.1/go.mod h1:hKKeZeCP8Y8+U6ZFS32ot1jHV/ github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= -github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/replicaset.go b/replicaset.go index 8d57d6f..9dbcdb4 100644 --- a/replicaset.go +++ b/replicaset.go @@ -119,39 +119,6 @@ func bucketStatWait(future *tarantool.Future) (BucketStatInfo, error) { return bucketStatResponse.info, nil } -// ReplicaCall perform function on remote storage -// link https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L661 -// Deprecated: ReplicaCall is deprecated, -// because looks like it has a little bit broken interface. -// See https://github.com/tarantool/go-vshard-router/issues/42. -// Use CallAsync instead. -func (rs *Replicaset) ReplicaCall( - ctx context.Context, - opts ReplicasetCallOpts, - fnc string, - args interface{}, -) (interface{}, StorageResultTypedFunc, error) { - if opts.Timeout == 0 { - opts.Timeout = callTimeoutDefault - } - - future := rs.CallAsync(ctx, opts, fnc, args) - - respData, err := future.Get() - if err != nil { - return nil, nil, fmt.Errorf("got error on future.Get(): %w", err) - } - - if len(respData) == 0 { - // Since this method returns the first element of respData by contract, we can't return anything is this case (broken interface) - return nil, nil, fmt.Errorf("%s response data is empty", fnc) - } - - return respData[0], func(result ...interface{}) error { - return future.GetTyped(&result) - }, nil -} - // CallAsync sends async request to remote storage func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future { if opts.Timeout > 0 { diff --git a/sugar.go b/sugar.go index 8821638..3231d15 100644 --- a/sugar.go +++ b/sugar.go @@ -18,9 +18,8 @@ type CallRequest struct { // CallResponse is a backwards-compatible structure with go-tarantool for easier replacement. type CallResponse struct { - rawResp interface{} - getTypedFnc StorageResultTypedFunc - err error + resp VshardRouterCallResp + err error } // NewCallRequest returns a new empty CallRequest. @@ -56,23 +55,17 @@ func (r *Router) Do(req *CallRequest, userMode pool.Mode) *CallResponse { bucketID = r.cfg.BucketGetter(ctx) } - vshardMode := ReadMode + vshardMode := CallModeBRO // If the user says he prefers to do it on the master, // then he agrees that it will go to the replica, which means he will not write. if userMode == pool.RW { - vshardMode = WriteMode + vshardMode = CallModeRW } - resp.rawResp, resp.getTypedFnc, resp.err = r.RouterCallImpl(ctx, - bucketID, - CallOpts{ - Timeout: r.cfg.RequestTimeout, - PoolMode: userMode, - VshardMode: vshardMode, - }, - req.fnc, - req.args) + resp.resp, resp.err = r.Call(ctx, bucketID, vshardMode, req.fnc, req.args, CallOpts{ + Timeout: r.cfg.RequestTimeout, + }) return resp } @@ -104,7 +97,7 @@ func (resp *CallResponse) GetTyped(result interface{}) error { return resp.err } - return resp.getTypedFnc(result) + return resp.resp.GetTyped(result) } // Get implementation now works synchronously for response. @@ -114,5 +107,5 @@ func (resp *CallResponse) Get() ([]interface{}, error) { return nil, resp.err } - return []interface{}{resp.rawResp}, nil + return resp.resp.Get() } diff --git a/tarantool_test.go b/tarantool_test.go index 2e53dbe..c29dff1 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -2,7 +2,9 @@ package vshard_router_test //nolint:revive import ( "context" + "fmt" "log" + "math/rand" "os" "testing" "time" @@ -10,14 +12,18 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-tarantool/v2/pool" "github.com/tarantool/go-tarantool/v2/test_helpers" vshardrouter "github.com/tarantool/go-vshard-router" "github.com/tarantool/go-vshard-router/providers/static" chelper "github.com/tarantool/go-vshard-router/test_helper" + "github.com/vmihailenco/msgpack/v5" ) -const instancesCount = 4 +const ( + instancesCount = 4 + totalBucketCount = 100 + username = "guest" +) // init servers from our cluster var serverNames = map[string]string{ @@ -101,7 +107,7 @@ func runTestMain(m *testing.M) int { for name, addr := range serverNames { dialers[i] = tarantool.NetDialer{ Address: addr, - User: "guest", + User: username, } opts[i] = test_helpers.StartOpts{ @@ -136,9 +142,9 @@ func TestRouter_ClusterBootstrap(t *testing.T) { ctx := context.Background() router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TotalBucketCount: 100, + TotalBucketCount: totalBucketCount, TopologyProvider: static.NewProvider(topology), - User: "guest", + User: username, }) require.NotNil(t, router) require.NoError(t, err) @@ -152,50 +158,312 @@ func TestRouter_ClusterBootstrap(t *testing.T) { } } -func TestRouter_RouterCallImpl_Decoding(t *testing.T) { +// for tarantool 3.0 uuid is not required +func TestNewRouter_IgnoreUUID(t *testing.T) { + ctx := context.Background() + + _, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TotalBucketCount: 100, + TopologyProvider: static.NewProvider(noUUIDTopology), + User: username, + }) + + require.NoError(t, err) +} + +func TestRouter_Topology(t *testing.T) { + t.Parallel() + ctx := context.Background() - type Product struct { - BucketID uint64 `msgpack:"bucket_id"` - ID string `msgpack:"id"` - Name string `msgpack:"name"` - Count uint64 `msgpack:"count"` + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(topology), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: username, + }) + + require.Nil(t, err, "NewRouter finished successfully") + + var rsInfo vshardrouter.ReplicasetInfo + var insInfo vshardrouter.InstanceInfo + for k, replicas := range topology { + if len(replicas) == 0 { + continue + } + rsInfo = k + //nolint:gosec + insInfo = replicas[rand.Int()%len(replicas)] } + tCtrl := router.Topology() + + // remove some random replicaset + _ = tCtrl.RemoveReplicaset(ctx, rsInfo.UUID) + // add it again + err = tCtrl.AddReplicasets(ctx, map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{rsInfo: topology[rsInfo]}) + require.Nil(t, err, "AddReplicasets finished successfully") + + // remove some random instance + err = tCtrl.RemoveInstance(ctx, rsInfo.UUID, insInfo.Name) + require.Nil(t, err, "RemoveInstance finished successfully") + + // add it again + err = tCtrl.AddInstance(ctx, rsInfo.UUID, insInfo) + require.Nil(t, err, "AddInstance finished successfully") +} + +type CustomDecodingStruct struct { + Name string + Age int +} + +func (c *CustomDecodingStruct) DecodeMsgpack(d *msgpack.Decoder) error { + arrLen, err := d.DecodeArrayLen() + if err != nil { + return err + } + + if arrLen != 2 { + return fmt.Errorf("length must be equal 2") + } + + name, err := d.DecodeString() + if err != nil { + return err + } + + c.Name = name + + age, err := d.DecodeInt() + if err != nil { + return err + } + + c.Age = age + + return nil +} + +func TestRouter_Call(t *testing.T) { + t.Parallel() + + ctx := context.Background() + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TotalBucketCount: 100, TopologyProvider: static.NewProvider(topology), - User: "guest", + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: username, }) - require.NotNil(t, router) - require.NoError(t, err) + require.NoError(t, err, "NewRouter started successfully") - // bootstrap and discovery again if this is single test testing - require.NoError(t, router.ClusterBootstrap(ctx, true)) - require.NoError(t, router.DiscoveryAllBuckets(ctx)) + bucketID := randBucketID(totalBucketCount) - id := uuid.New() + rs, err := router.BucketResolve(ctx, bucketID) + require.NoError(t, err, "BucketResolve with no err") - bucketID := router.RouterBucketIDStrCRC32(id.String()) - _, _, err = router.RouterCallImpl( - ctx, - bucketID, - vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW, Timeout: 10 * time.Second}, - "echo", - []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}, "test"}) + t.Run("proto test", func(t *testing.T) { + const maxRespLen = 3 + for argLen := 0; argLen <= maxRespLen; argLen++ { + args := make([]interface{}, 0, argLen) - require.NoError(t, err) + for i := 0; i < argLen; i++ { + args = append(args, "arg") + } + + var routerOpts vshardrouter.CallOpts + resp, err := router.CallRW(ctx, bucketID, "echo", args, routerOpts) + require.NoError(t, err, "router.CallRW with no err") + + var resViaVshard interface{} + var resDirect interface{} + var resGet []interface{} + + err = resp.GetTyped(&resViaVshard) + require.NoError(t, err, "GetTyped with no err") + + resGet, err = resp.Get() + require.NoError(t, err, "Get with no err") + + require.Equal(t, resViaVshard, resGet, "resViaVshard and resGet are equal") + + var rsOpts vshardrouter.ReplicasetCallOpts + + err = rs.CallAsync(ctx, rsOpts, "echo", args).GetTyped(&resDirect) + require.NoError(t, err, "rs.CallAsync.GetTyped with no error") + + require.Equalf(t, resDirect, resViaVshard, "resDirect != resViaVshard on argLen %d", argLen) + } + }) + + t.Run("custom decoders works valid", func(t *testing.T) { + res := &CustomDecodingStruct{} + args := []interface{}{"Maksim", 21} + + resp, err := router.CallRW(ctx, bucketID, "echo", &args, vshardrouter.CallOpts{}) + require.NoError(t, err, "router.CallRW with no err") + + err = resp.GetTyped(res) + require.NoError(t, err) + + require.Equal(t, &CustomDecodingStruct{Name: "Maksim", Age: 21}, res) + }) + + t.Run("router.Call err", func(t *testing.T) { + callMode := vshardrouter.CallModeRO + args := []interface{}{} + callOpts := vshardrouter.CallOpts{} + + _, err := router.Call(ctx, totalBucketCount+1, callMode, "echo", args, callOpts) + require.Error(t, err, "RouterCall echo finished with err when bucketID is out of range") + + _, err = router.Call(ctx, 0, callMode, "echo", args, callOpts) + require.Error(t, err, "RouterCall echo finished with err when bucketID is 0") + + _, err = router.Call(ctx, bucketID, callMode, "echo", nil, callOpts) + require.Error(t, err, "RouterCall echo finised with err on nil args") + + _, err = router.Call(ctx, bucketID, callMode, "raise_luajit_error", args, callOpts) + require.NotNil(t, err, "RouterCall raise_luajit_error finished with err") + + _, err = router.Call(ctx, bucketID, callMode, "raise_client_error", args, callOpts) + require.NotNil(t, err, "RouterCall raise_client_error finished with err") + }) + + t.Run("router.Call simulate vshard error", func(t *testing.T) { + rsMap := router.RouterRouteAll() + + // 1. Replace replicaset for bucketID + for k, v := range rsMap { + if rs != v { + res, err := router.BucketSet(bucketID, k) + require.Nil(t, err, "BucketSet finished with no err") + require.Equal(t, res, v) + break + } + } + + // 2. Try to call something + _, err = router.Call(ctx, bucketID, vshardrouter.CallModeRO, "echo", []interface{}{}, vshardrouter.CallOpts{}) + require.Nil(t, err, "RouterCallImpl echo finished with no err even on dirty bucket map") + }) } -// for tarantool 3.0 uuid is not required -func TestNewRouter_IgnoreUUID(t *testing.T) { +func randBucketID(totalBucketCount uint64) uint64 { + //nolint:gosec + return (rand.Uint64() % totalBucketCount) + 1 +} + +// BENCH + +type Product struct { + BucketID uint64 `msgpack:"bucket_id"` + ID string `msgpack:"id"` + Name string `msgpack:"name"` + Count uint64 `msgpack:"count"` +} + +func BenchmarkCallSimpleInsert_GO_Call(b *testing.B) { + b.StopTimer() + ctx := context.Background() - _, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TotalBucketCount: 100, - TopologyProvider: static.NewProvider(noUUIDTopology), - User: "guest", + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(topology), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: username, + RequestTimeout: time.Minute, }) + require.NoError(b, err) - require.NoError(t, err) + err = router.ClusterBootstrap(ctx, true) + require.NoError(b, err) + + b.StartTimer() + for i := 0; i < b.N; i++ { + id := uuid.New() + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + _, err := router.Call( + ctx, + bucketID, + vshardrouter.CallModeRW, + "product_add", + []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}, + vshardrouter.CallOpts{Timeout: 10 * time.Second}) + require.NoError(b, err) + } + + b.ReportAllocs() +} + +func BenchmarkCallSimpleSelect_GO_Call(b *testing.B) { + b.StopTimer() + + ctx := context.Background() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(topology), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: username, + }) + require.NoError(b, err) + + err = router.ClusterBootstrap(ctx, true) + require.NoError(b, err) + + ids := make([]uuid.UUID, b.N) + + for i := 0; i < b.N; i++ { + id := uuid.New() + ids[i] = id + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + _, err := router.Call( + ctx, + bucketID, + vshardrouter.CallModeRW, + "product_add", + []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}, + vshardrouter.CallOpts{}, + ) + require.NoError(b, err) + } + + type Request struct { + ID string `msgpack:"id"` + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + id := ids[i] + + bucketID := router.RouterBucketIDStrCRC32(id.String()) + resp, err1 := router.Call( + ctx, + bucketID, + vshardrouter.CallModeBRO, + "product_get", + []interface{}{&Request{ID: id.String()}}, + vshardrouter.CallOpts{Timeout: time.Second}, + ) + + var product Product + + err2 := resp.GetTyped(&[]interface{}{&product}) + + b.StopTimer() + require.NoError(b, err1) + require.NoError(b, err2) + b.StartTimer() + } + + b.ReportAllocs() } diff --git a/tests/tnt/call_bench_test.go b/tests/tnt/call_bench_test.go index 2be9103..c191280 100644 --- a/tests/tnt/call_bench_test.go +++ b/tests/tnt/call_bench_test.go @@ -20,79 +20,6 @@ type Product struct { Count uint64 `msgpack:"count"` } -func BenchmarkCallSimpleInsert_GO_RouterCall(b *testing.B) { - b.StopTimer() - skipOnInvalidRun(b) - - ctx := context.Background() - - cfg := getCfg() - - router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TopologyProvider: static.NewProvider(cfg), - DiscoveryTimeout: 5 * time.Second, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TotalBucketCount: totalBucketCount, - User: defaultTntUser, - Password: defaultTntPassword, - RequestTimeout: time.Minute, - }) - require.NoError(b, err) - - b.StartTimer() - for i := 0; i < b.N; i++ { - id := uuid.New() - - bucketID := router.RouterBucketIDStrCRC32(id.String()) - _, _, err := router.RouterCallImpl( - ctx, - bucketID, - vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW, Timeout: 10 * time.Second}, - "product_add", - []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}) - require.NoError(b, err) - } - - b.ReportAllocs() -} - -func BenchmarkCallSimpleInsert_GO_Call(b *testing.B) { - b.StopTimer() - skipOnInvalidRun(b) - - ctx := context.Background() - - cfg := getCfg() - - router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TopologyProvider: static.NewProvider(cfg), - DiscoveryTimeout: 5 * time.Second, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TotalBucketCount: totalBucketCount, - User: defaultTntUser, - Password: defaultTntPassword, - RequestTimeout: time.Minute, - }) - require.NoError(b, err) - - b.StartTimer() - for i := 0; i < b.N; i++ { - id := uuid.New() - - bucketID := router.RouterBucketIDStrCRC32(id.String()) - _, err := router.Call( - ctx, - bucketID, - vshardrouter.VshardRouterCallModeRW, - "product_add", - []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}, - vshardrouter.VshardRouterCallOptions{Timeout: 10 * time.Second}) - require.NoError(b, err) - } - - b.ReportAllocs() -} - func BenchmarkCallSimpleInsert_Lua(b *testing.B) { b.StopTimer() @@ -129,69 +56,7 @@ func BenchmarkCallSimpleInsert_Lua(b *testing.B) { b.ReportAllocs() } -func BenchmarkCallSimpleSelect_GO_RouterCall(b *testing.B) { - b.StopTimer() - skipOnInvalidRun(b) - - ctx := context.Background() - - cfg := getCfg() - - router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TopologyProvider: static.NewProvider(cfg), - DiscoveryTimeout: 5 * time.Second, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TotalBucketCount: totalBucketCount, - User: defaultTntUser, - Password: defaultTntPassword, - }) - require.NoError(b, err) - - ids := make([]uuid.UUID, b.N) - - for i := 0; i < b.N; i++ { - id := uuid.New() - ids[i] = id - - bucketID := router.RouterBucketIDStrCRC32(id.String()) - _, _, err := router.RouterCallImpl( - ctx, - bucketID, - vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW}, - "product_add", - []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}) - require.NoError(b, err) - } - - type Request struct { - ID string `msgpack:"id"` - } - - b.StartTimer() - for i := 0; i < b.N; i++ { - id := ids[i] - - bucketID := router.RouterBucketIDStrCRC32(id.String()) - _, getTyped, err1 := router.RouterCallImpl( - ctx, - bucketID, - vshardrouter.CallOpts{VshardMode: vshardrouter.ReadMode, PoolMode: pool.ANY, Timeout: time.Second}, - "product_get", - []interface{}{&Request{ID: id.String()}}) - - var product Product - err2 := getTyped(&product) - - b.StopTimer() - require.NoError(b, err1) - require.NoError(b, err2) - b.StartTimer() - } - - b.ReportAllocs() -} - -func BenchmarkCallSimpleSelect_GO_Call(b *testing.B) { +func BenchmarkCallSimpleSelect_Lua(b *testing.B) { b.StopTimer() skipOnInvalidRun(b) @@ -219,75 +84,10 @@ func BenchmarkCallSimpleSelect_GO_Call(b *testing.B) { _, err := router.Call( ctx, bucketID, - vshardrouter.VshardRouterCallModeRW, + vshardrouter.CallModeRW, "product_add", []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}, - vshardrouter.VshardRouterCallOptions{}, - ) - require.NoError(b, err) - } - - type Request struct { - ID string `msgpack:"id"` - } - - b.StartTimer() - for i := 0; i < b.N; i++ { - id := ids[i] - - bucketID := router.RouterBucketIDStrCRC32(id.String()) - resp, err1 := router.Call( - ctx, - bucketID, - vshardrouter.VshardRouterCallModeBRO, - "product_get", - []interface{}{&Request{ID: id.String()}}, - vshardrouter.VshardRouterCallOptions{Timeout: time.Second}, - ) - - var product Product - err2 := resp.GetTyped([]interface{}{&product}) - - b.StopTimer() - require.NoError(b, err1) - require.NoError(b, err2) - b.StartTimer() - } - - b.ReportAllocs() -} - -func BenchmarkCallSimpleSelect_Lua(b *testing.B) { - b.StopTimer() - skipOnInvalidRun(b) - - ctx := context.Background() - - cfg := getCfg() - - router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TopologyProvider: static.NewProvider(cfg), - DiscoveryTimeout: 5 * time.Second, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TotalBucketCount: totalBucketCount, - User: defaultTntUser, - Password: defaultTntPassword, - }) - require.NoError(b, err) - - ids := make([]uuid.UUID, b.N) - - for i := 0; i < b.N; i++ { - id := uuid.New() - ids[i] = id - - bucketID := router.RouterBucketIDStrCRC32(id.String()) - _, _, err := router.RouterCallImpl( - ctx, - bucketID, - vshardrouter.CallOpts{VshardMode: vshardrouter.WriteMode, PoolMode: pool.RW}, - "product_add", - []interface{}{&Product{Name: "test-go", BucketID: bucketID, ID: id.String(), Count: 3}}) + vshardrouter.CallOpts{}) require.NoError(b, err) } diff --git a/tests/tnt/concurrent_topology_test.go b/tests/tnt/concurrent_topology_test.go index 7e7d61d..4115435 100644 --- a/tests/tnt/concurrent_topology_test.go +++ b/tests/tnt/concurrent_topology_test.go @@ -150,11 +150,9 @@ func TestConncurrentTopologyChange(t *testing.T) { bucketID := randBucketID(totalBucketCount) args := []interface{}{"arg1"} - callOpts := vshardrouter.CallOpts{ - VshardMode: vshardrouter.ReadMode, - } + callOpts := vshardrouter.CallOpts{} - _, _, _ = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args) + _, _ = router.Call(ctx, bucketID, vshardrouter.CallModeBRO, "echo", args, callOpts) } }() diff --git a/tests/tnt/replicaset_test.go b/tests/tnt/replicaset_test.go index a980c19..52706e9 100644 --- a/tests/tnt/replicaset_test.go +++ b/tests/tnt/replicaset_test.go @@ -12,90 +12,6 @@ import ( "github.com/tarantool/go-vshard-router/providers/static" ) -func TestReplicasetReplicaCall(t *testing.T) { - skipOnInvalidRun(t) - - t.Parallel() - - ctx := context.Background() - - cfg := getCfg() - - router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TopologyProvider: static.NewProvider(cfg), - DiscoveryTimeout: 5 * time.Second, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TotalBucketCount: totalBucketCount, - User: defaultTntUser, - Password: defaultTntPassword, - }) - - require.NoError(t, err, "NewRouter finished successfully") - - rsMap := router.RouterRouteAll() - - var rs *vshardrouter.Replicaset - // pick random rs - for _, v := range rsMap { - rs = v - break - } - - _ = rs.String() // just for coverage - - callOpts := vshardrouter.ReplicasetCallOpts{ - PoolMode: pool.ANY, - } - - _, _, err = rs.ReplicaCall(ctx, callOpts, "echo", nil) - require.NotNil(t, err, "ReplicaCall finished with err on nil args") - - _, _, err = rs.ReplicaCall(ctx, callOpts, "echo", []interface{}{}) - require.NotNil(t, err, "ReplicaCall returns err on empty response (broken interface)") - - // args len is 1 - args := []interface{}{"arg1"} - resp, getTyped, err := rs.ReplicaCall(ctx, callOpts, "echo", args) - require.Nilf(t, err, "ReplicaCall finished with no err for args: %v", args) - require.Equalf(t, args[0], resp, "ReplicaCall resp ok for args: %v", args) - var typed interface{} - err = getTyped(&typed) - require.Nilf(t, err, "getTyped finished with no err for args: %v", args) - require.Equalf(t, args[0], typed, "getTyped result is ok for args: %v", args) - - // args len is 2 - args = []interface{}{"arg1", "arg2"} - resp, getTyped, err = rs.ReplicaCall(ctx, callOpts, "echo", args) - require.Nilf(t, err, "ReplicaCall finished with no err for args: %v", args) - require.Equalf(t, args[0], resp, "ReplicaCall resp ok for args: %v", args) - typed = nil // set to nil, otherwise getTyped tries to use the old content - err = getTyped(&typed) - require.Nilf(t, err, "getTyped finished with no err for args: %v", args) - require.Equalf(t, args[0], typed, "getTyped result is ok for args: %v", args) - - // don't decode assert error - args = []interface{}{nil, "non nil"} - _, _, err = rs.ReplicaCall(ctx, callOpts, "echo", args) - require.Nil(t, err, "ReplicaCall doesn't try decode assert error") - - args = []interface{}{2} - callOpts.Timeout = 500 * time.Millisecond - start := time.Now() - _, _, err = rs.ReplicaCall(ctx, callOpts, "sleep", args) - duration := time.Since(start) - require.NotNil(t, err, "ReplicaCall timeout happened") - require.Less(t, duration, 600*time.Millisecond, "ReplicaCall timeout works correctly") - callOpts.Timeout = 0 // return back default value - - // raise_luajit_error - _, _, err = rs.ReplicaCall(ctx, callOpts, "raise_luajit_error", nil) - require.NotNil(t, err, "raise_luajit_error returns error") - - // raise_client_error - _, _, err = rs.ReplicaCall(ctx, callOpts, "raise_client_error", nil) - require.NotNil(t, err, "raise_client_error returns error") -} - func TestReplicsetCallAsync(t *testing.T) { skipOnInvalidRun(t) diff --git a/tests/tnt/router_call_test.go b/tests/tnt/router_call_test.go deleted file mode 100644 index 60c797d..0000000 --- a/tests/tnt/router_call_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package tnt - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/tarantool/go-tarantool/v2/pool" - vshardrouter "github.com/tarantool/go-vshard-router" - "github.com/tarantool/go-vshard-router/providers/static" -) - -func TestRouterCallProto(t *testing.T) { - skipOnInvalidRun(t) - - t.Parallel() - - ctx := context.Background() - - cfg := getCfg() - - router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TopologyProvider: static.NewProvider(cfg), - DiscoveryTimeout: 5 * time.Second, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TotalBucketCount: totalBucketCount, - User: defaultTntUser, - Password: defaultTntPassword, - }) - - require.Nil(t, err, "NewRouter finished successfully") - - bucketID := randBucketID(totalBucketCount) - arg1, arg2 := "arg1", "arg2" - args := []interface{}{arg1, arg2} - callOpts := vshardrouter.CallOpts{ - VshardMode: vshardrouter.ReadMode, - PoolMode: pool.PreferRO, - } - - resp, getTyped, err := router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args) - require.Nil(t, err, "RouterCallImpl echo finished with no err") - require.EqualValues(t, args, resp, "RouterCallImpl echo resp correct") - var arg1Got, arg2Got string - err = getTyped(&arg1Got, &arg2Got) - require.Nil(t, err, "RouterCallImpl getTyped call ok") - require.Equal(t, arg1, arg1Got, "RouterCallImpl getTyped arg1 res ok") - require.Equal(t, arg2, arg2Got, "RouterCallImpl getTyped arg2 res ok") - - _, _, err = router.RouterCallImpl(ctx, totalBucketCount+1, callOpts, "echo", args) - require.Error(t, err, "RouterCallImpl echo finished with err when bucketID is out of range") - - _, _, err = router.RouterCallImpl(ctx, 0, callOpts, "echo", args) - require.Error(t, err, "RouterCallImpl echo finished with err when bucketID is 0") - - _, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", nil) - require.NotNil(t, err, "RouterCallImpl echo finised with nil args") - - _, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "raise_luajit_error", args) - require.NotNil(t, err, "RouterCallImpl raise_luajit_error finished with err") - - _, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "raise_client_error", args) - require.NotNil(t, err, "RouterCallImpl raise_client_error finished with err") - - // maxRespLen is due to: - // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L3130 - const maxRespLen = 3 - for argLen := 0; argLen <= maxRespLen+1; argLen++ { - args := []interface{}{} - for i := 0; i < argLen; i++ { - args = append(args, "arg") - } - - resp, getTyped, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args) - require.Nilf(t, err, "RouterCallImpl no err for arglen %d", argLen) - - expect := args - if argLen > maxRespLen { - expect = expect[:maxRespLen] - } - - require.Equal(t, expect, resp, "RouterCallImpl resp ok for arglen %d", argLen) - var typed interface{} - err = getTyped(&typed) - require.Nil(t, err, "RouterCallImpl getTyped no err for arglen %d", argLen) - - if argLen > 0 { - // TODO: Should we handle multiple return values in getTyped? - require.Equal(t, expect[0], typed, "RouterCallImpl getTyped resp ok for arglen %d", argLen) - } - } - - // simulate vshard error - - // 1. Replace replicaset for bucketID - rs, err := router.BucketResolve(ctx, bucketID) - require.Nil(t, err, "BucketResolve finished with no err") - rsMap := router.RouterRouteAll() - - for k, v := range rsMap { - if rs != v { - res, err := router.BucketSet(bucketID, k) - require.Nil(t, err, "BucketSet finished with no err") - require.Equal(t, res, v) - break - } - } - - // 2. Try to call something - _, _, err = router.RouterCallImpl(ctx, bucketID, callOpts, "echo", args) - require.Nil(t, err, "RouterCallImpl echo finished with no err even on dirty bucket map") -} diff --git a/tests/tnt/tnt_test.go b/tests/tnt/tnt_test.go index ac9e320..3852e9a 100644 --- a/tests/tnt/tnt_test.go +++ b/tests/tnt/tnt_test.go @@ -65,18 +65,3 @@ func randBucketID(totalBucketCount uint64) uint64 { //nolint:gosec return (rand.Uint64() % totalBucketCount) + 1 } - -func TestConcurrentRouterCall(t *testing.T) { - /* TODO - 1) Invalidate some random bucket id - 2) concurrent call of replicalcall - */ - _ = t -} - -func TestRetValues(t *testing.T) { - /* TODO - 1) Replicacall returns no value, 1 value, 2 values, 3 values, etc..., assert, lua error? - */ - _ = t -} diff --git a/tests/tnt/topology_test.go b/tests/tnt/topology_test.go deleted file mode 100644 index dbcd8d3..0000000 --- a/tests/tnt/topology_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package tnt - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/stretchr/testify/require" - vshardrouter "github.com/tarantool/go-vshard-router" - "github.com/tarantool/go-vshard-router/providers/static" -) - -func TestTopology(t *testing.T) { - skipOnInvalidRun(t) - - t.Parallel() - - ctx := context.Background() - - cfg := getCfg() - - router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TopologyProvider: static.NewProvider(cfg), - DiscoveryTimeout: 5 * time.Second, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TotalBucketCount: totalBucketCount, - User: defaultTntUser, - Password: defaultTntPassword, - }) - - require.Nil(t, err, "NewRouter finished successfully") - - var rsInfo vshardrouter.ReplicasetInfo - var insInfo vshardrouter.InstanceInfo - for k, replicas := range cfg { - if len(replicas) == 0 { - continue - } - rsInfo = k - //nolint:gosec - insInfo = replicas[rand.Int()%len(replicas)] - } - - // remove some random replicaset - _ = router.RemoveReplicaset(ctx, rsInfo.UUID) - // add it again - err = router.AddReplicasets(ctx, map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{rsInfo: cfg[rsInfo]}) - require.Nil(t, err, "AddReplicasets finished successfully") - - // remove some random instance - err = router.RemoveInstance(ctx, rsInfo.UUID, insInfo.Name) - require.Nil(t, err, "RemoveInstance finished successfully") - - // add it again - err = router.AddInstance(ctx, rsInfo.UUID, insInfo) - require.Nil(t, err, "AddInstance finished successfully") -}