diff --git a/CHANGELOG.md b/CHANGELOG.md index baa5682..13c3485 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ CHANGES: * VshardRouterCallMode type renamed to CallMode for simplicity. * StorageResultTypedFunc type removed as useless type. * Updated msgpack version from v5.3.5 to v5.4.1. +* Replicaset identifier now is replicaset name instead uuid. TESTS: @@ -19,6 +20,7 @@ TESTS: * Moved TestReplicasetReplicaCall and Go benches from tests/tnt to tarantool_test.go . * TestRouterCallProto rewrote. * Start using constants in tarantool_test.go instead duplicate variables. +* TestRouterMapCall moved to tarantool_test.go and renamed to TestRouter_RouterMapCallRWImpl. ## v1.3.2 diff --git a/api.go b/api.go index ea9b1e4..7c72f11 100644 --- a/api.go +++ b/api.go @@ -6,8 +6,6 @@ import ( "fmt" "time" - "github.com/google/uuid" - "github.com/tarantool/go-tarantool/v2" "github.com/tarantool/go-tarantool/v2/pool" "github.com/vmihailenco/msgpack/v5" @@ -313,27 +311,35 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode, // We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663 r.BucketReset(bucketID) - if vshardError.Destination != "" { - destinationUUID, err := uuid.Parse(vshardError.Destination) - if err != nil { - return VshardRouterCallResp{}, fmt.Errorf("protocol violation %s: malformed destination %w: %w", - vshardStorageClientCall, vshardError, err) - } - + if destination := vshardError.Destination; destination != "" { var loggedOnce bool for { - idToReplicasetRef := r.getIDToReplicaset() - if _, ok := idToReplicasetRef[destinationUUID]; ok { - _, err := r.BucketSet(bucketID, destinationUUID) + nameToReplicasetRef := r.getNameToReplicaset() + + _, destinationExists := nameToReplicasetRef[destination] + + if !destinationExists { + // for older logic with uuid we must support backward compatibility + // if destination is uuid and not name, lets find it too + for _, rsRef := range nameToReplicasetRef { + if rsRef.info.UUID.String() == destination { + destinationExists = true + break + } + } + } + + if destinationExists { + _, err := r.BucketSet(bucketID, destination) if err == nil { break // breaks loop } - r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationUUID, err) + r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destination, err) } if !loggedOnce { r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+ - "update configuration", destinationUUID) + "update configuration", destination) loggedOnce = true } @@ -543,13 +549,14 @@ func (r *Router) RouterMapCallRWImpl( fnc string, args interface{}, opts CallOpts, -) (map[uuid.UUID]interface{}, error) { +) (map[string]interface{}, error) { // nolint:gosimple return RouterMapCallRW[interface{}](r, ctx, fnc, args, RouterMapCallRWOptions{Timeout: opts.Timeout}) } type replicasetFuture struct { - uuid uuid.UUID + // replicaset name + name string future *tarantool.Future } @@ -561,7 +568,7 @@ type replicasetFuture struct { // see: https://github.com/golang/go/issues/49085. func RouterMapCallRW[T any](r *Router, ctx context.Context, fnc string, args interface{}, opts RouterMapCallRWOptions, -) (map[uuid.UUID]T, error) { +) (map[string]T, error) { const vshardStorageServiceCall = "vshard.storage._call" timeout := callTimeoutDefault @@ -572,14 +579,14 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context, timeStart := time.Now() refID := r.refID.Add(1) - idToReplicasetRef := r.getIDToReplicaset() + nameToReplicasetRef := r.getNameToReplicaset() defer func() { // call function "storage_unref" if map_callrw is failed or successed storageUnrefReq := tarantool.NewCallRequest(vshardStorageServiceCall). Args([]interface{}{"storage_unref", refID}) - for _, rs := range idToReplicasetRef { + for _, rs := range nameToReplicasetRef { future := rs.conn.Do(storageUnrefReq, pool.RW) future.SetError(nil) // TODO: does it cancel the request above or not? } @@ -594,12 +601,12 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context, Context(ctx). Args([]interface{}{"storage_ref", refID, timeout}) - var rsFutures = make([]replicasetFuture, 0, len(idToReplicasetRef)) + var rsFutures = make([]replicasetFuture, 0, len(nameToReplicasetRef)) // ref stage: send concurrent ref requests - for uuid, rs := range idToReplicasetRef { + for name, rs := range nameToReplicasetRef { rsFutures = append(rsFutures, replicasetFuture{ - uuid: uuid, + name: name, future: rs.conn.Do(storageRefReq, pool.RW), }) } @@ -612,11 +619,11 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context, var storageRefResponse storageRefResponseProto if err := rsFuture.future.GetTyped(&storageRefResponse); err != nil { - return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.uuid, err) + return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.name, err) } if storageRefResponse.err != nil { - return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.uuid, storageRefResponse.err) + return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.name, storageRefResponse.err) } totalBucketCount += storageRefResponse.bucketCount @@ -636,33 +643,33 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context, rsFutures = rsFutures[0:0] // map stage: send concurrent map requests - for uuid, rs := range idToReplicasetRef { + for name, rs := range nameToReplicasetRef { rsFutures = append(rsFutures, replicasetFuture{ - uuid: uuid, + name: name, future: rs.conn.Do(storageMapReq, pool.RW), }) } // map stage: get their responses - idToResult := make(map[uuid.UUID]T) + nameToResult := make(map[string]T) for _, rsFuture := range rsFutures { var storageMapResponse storageMapResponseProto[T] err := rsFuture.future.GetTyped(&storageMapResponse) if err != nil { - return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.uuid, err) + return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.name, err) } if !storageMapResponse.ok { - return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.uuid, storageMapResponse.err) + return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.name, storageMapResponse.err) } - idToResult[rsFuture.uuid] = storageMapResponse.value + nameToResult[rsFuture.name] = storageMapResponse.value } r.metrics().RequestDuration(time.Since(timeStart), true, true) - return idToResult, nil + return nameToResult, nil } // RouterRoute get replicaset object by bucket identifier. @@ -672,15 +679,15 @@ func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset, } // RouterRouteAll return map of all replicasets. -func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset { - idToReplicasetRef := r.getIDToReplicaset() +func (r *Router) RouterRouteAll() map[string]*Replicaset { + nameToReplicasetRef := r.getNameToReplicaset() // Do not expose the original map to prevent unauthorized modification. - idToReplicasetCopy := make(map[uuid.UUID]*Replicaset) + nameToReplicasetCopy := make(map[string]*Replicaset) - for k, v := range idToReplicasetRef { - idToReplicasetCopy[k] = v + for k, v := range nameToReplicasetRef { + nameToReplicasetCopy[k] = v } - return idToReplicasetCopy + return nameToReplicasetCopy } diff --git a/discovery.go b/discovery.go index 12b1265..828b6b7 100644 --- a/discovery.go +++ b/discovery.go @@ -9,7 +9,6 @@ import ( "golang.org/x/sync/errgroup" - "github.com/google/uuid" "github.com/tarantool/go-tarantool/v2" ) @@ -70,18 +69,18 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica } func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Replicaset, error) { - idToReplicasetRef := r.getIDToReplicaset() + nameToReplicasetRef := r.getNameToReplicaset() type rsFuture struct { - rsID uuid.UUID + rsName string future *tarantool.Future } - var rsFutures = make([]rsFuture, 0, len(idToReplicasetRef)) + var rsFutures = make([]rsFuture, 0, len(nameToReplicasetRef)) // Send a bunch of parallel requests - for rsID, rs := range idToReplicasetRef { + for rsName, rs := range nameToReplicasetRef { rsFutures = append(rsFutures, rsFuture{ - rsID: rsID, + rsName: rsName, future: rs.bucketStatAsync(ctx, bucketID), }) } @@ -90,16 +89,16 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl if _, err := bucketStatWait(rsFuture.future); err != nil { var vshardError StorageCallVShardError if !errors.As(err, &vshardError) { - r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsID, err) + r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsName, err) } // just skip, bucket may not belong to this replicaset continue } // It's ok if several replicasets return ok to bucket_stat command for the same bucketID, just pick any of them. - rs, err := r.BucketSet(bucketID, rsFuture.rsID) + rs, err := r.BucketSet(bucketID, rsFuture.rsName) if err != nil { - r.log().Errorf(ctx, "bucketSearchLegacy: can't set rsID %v for bucketID %d: %v", rsFuture.rsID, bucketID, err) + r.log().Errorf(ctx, "bucketSearchLegacy: can't set rsID %v for bucketID %d: %v", rsFuture.rsName, bucketID, err) return nil, newVShardErrorNoRouteToBucket(bucketID) } @@ -127,21 +126,21 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L1700 // https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/consts.lua#L37 func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) (*Replicaset, error) { - idToReplicasetRef := r.getIDToReplicaset() + nameToReplicasetRef := r.getNameToReplicaset() view := r.getConsistentView() type rsFuture struct { rs *Replicaset - rsID uuid.UUID + rsName string future *tarantool.Future } - var rsFutures = make([]rsFuture, 0, len(idToReplicasetRef)) + var rsFutures = make([]rsFuture, 0, len(nameToReplicasetRef)) // Send a bunch of parallel requests - for rsID, rs := range idToReplicasetRef { + for rsName, rs := range nameToReplicasetRef { rsFutures = append(rsFutures, rsFuture{ rs: rs, - rsID: rsID, + rsName: rsName, future: rs.bucketsDiscoveryAsync(ctx, bucketIDToFind), }) } @@ -151,7 +150,7 @@ func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) for _, rsFuture := range rsFutures { resp, err := bucketsDiscoveryWait(rsFuture.future) if err != nil { - r.log().Errorf(ctx, "bucketSearchBatched: bucketsDiscoveryWait error for %v: %v", rsFuture.rsID, err) + r.log().Errorf(ctx, "bucketSearchBatched: bucketsDiscoveryWait error for %v: %v", rsFuture.rsName, err) // just skip, we still may find our bucket in another replicaset continue } @@ -231,9 +230,9 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error { errGr, ctx := errgroup.WithContext(ctx) view := r.getConsistentView() - idToReplicasetRef := r.getIDToReplicaset() + nameToReplicasetRef := r.getNameToReplicaset() - for _, rs := range idToReplicasetRef { + for _, rs := range nameToReplicasetRef { rs := rs errGr.Go(func() error { diff --git a/replicaset.go b/replicaset.go index 9dbcdb4..4d66c60 100644 --- a/replicaset.go +++ b/replicaset.go @@ -13,16 +13,35 @@ import ( "github.com/vmihailenco/msgpack/v5/msgpcode" ) +// ReplicasetInfo represents information about a replicaset, including its name, unique identifier, weight, and state. type ReplicasetInfo struct { - Name string - UUID uuid.UUID - Weight float64 - PinnedCount uint64 + // Name — the name of the replicaset. + // This string is required and is used to identify the replicaset. + Name string + // UUID — the unique identifier of the replica. + // This is an optional value that can be used to uniquely distinguish each replicaset. + UUID uuid.UUID + // Weight — the weight of the replicaset. + // This floating-point number may be used to determine the importance or priority of the replicaset. + Weight float64 + // PinnedCount — the number of pinned items. + // This value indicates how many items or tasks are associated with the replicaset. + PinnedCount uint64 + // IgnoreDisbalance — a flag indicating whether to ignore load imbalance when distributing tasks. + // If true, the replicaset will be excluded from imbalance checks. IgnoreDisbalance bool } -func (rsi ReplicasetInfo) String() string { - return fmt.Sprintf("{name: %s, uuid: %s}", rsi.Name, rsi.UUID) +func (ri ReplicasetInfo) Validate() error { + if ri.Name == "" { + return fmt.Errorf("%w: rsInfo.Name is empty", ErrInvalidReplicasetInfo) + } + + return nil +} + +func (ri ReplicasetInfo) String() string { + return fmt.Sprintf("{name: %s, uuid: %s}", ri.Name, ri.UUID) } type ReplicasetCallOpts struct { diff --git a/tarantool_test.go b/tarantool_test.go index c29dff1..62559d5 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -200,17 +200,17 @@ func TestRouter_Topology(t *testing.T) { tCtrl := router.Topology() // remove some random replicaset - _ = tCtrl.RemoveReplicaset(ctx, rsInfo.UUID) + _ = tCtrl.RemoveReplicaset(ctx, rsInfo.Name) // add it again err = tCtrl.AddReplicasets(ctx, map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{rsInfo: topology[rsInfo]}) require.Nil(t, err, "AddReplicasets finished successfully") // remove some random instance - err = tCtrl.RemoveInstance(ctx, rsInfo.UUID, insInfo.Name) + err = tCtrl.RemoveInstance(ctx, rsInfo.Name, insInfo.Name) require.Nil(t, err, "RemoveInstance finished successfully") // add it again - err = tCtrl.AddInstance(ctx, rsInfo.UUID, insInfo) + err = tCtrl.AddInstance(ctx, rsInfo.Name, insInfo) require.Nil(t, err, "AddInstance finished successfully") } @@ -467,3 +467,92 @@ func BenchmarkCallSimpleSelect_GO_Call(b *testing.B) { b.ReportAllocs() } + +func TestRouter_RouterMapCallRWImpl(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ + TopologyProvider: static.NewProvider(topology), + DiscoveryTimeout: 5 * time.Second, + DiscoveryMode: vshardrouter.DiscoveryModeOn, + TotalBucketCount: totalBucketCount, + User: username, + }) + require.Nil(t, err, "NewRouter created successfully") + + err = router.ClusterBootstrap(ctx, false) + require.NoError(t, err) + + callOpts := vshardrouter.CallOpts{} + + const arg = "arg1" + + // Enusre that RouterMapCallRWImpl works at all + echoArgs := []interface{}{arg} + resp, err := router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) + require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") + + for k, v := range resp { + require.Equalf(t, arg, v, "RouterMapCallRWImpl value ok for %v", k) + } + + echoArgs = []interface{}{1} + respInt, err := vshardrouter.RouterMapCallRW[int](router, ctx, "echo", echoArgs, vshardrouter.RouterMapCallRWOptions{}) + require.NoError(t, err, "RouterMapCallRW[int] echo finished with no err") + for k, v := range respInt { + require.Equalf(t, 1, v, "RouterMapCallRW[int] value ok for %v", k) + } + + // RouterMapCallRWImpl returns only one value + echoArgs = []interface{}{arg, "arg2"} + resp, err = router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) + require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") + + for k, v := range resp { + require.Equalf(t, arg, v, "RouterMapCallRWImpl value ok for %v", k) + } + + // RouterMapCallRWImpl returns nil when no return value + noArgs := []interface{}{} + resp, err = router.RouterMapCallRWImpl(ctx, "echo", noArgs, callOpts) + require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") + + for k, v := range resp { + require.Equalf(t, nil, v, "RouterMapCallRWImpl value ok for %v", k) + } + + // Ensure that RouterMapCallRWImpl sends requests concurrently + const sleepToSec int = 1 + sleepArgs := []interface{}{sleepToSec} + + start := time.Now() + _, err = router.RouterMapCallRWImpl(ctx, "sleep", sleepArgs, vshardrouter.CallOpts{ + Timeout: 2 * time.Second, // because default timeout is 0.5 sec + }) + duration := time.Since(start) + + require.NoError(t, err, "RouterMapCallRWImpl sleep finished with no err") + require.Greater(t, len(topology), 1, "There are more than one replicasets") + require.Less(t, duration, 1200*time.Millisecond, "Requests were send concurrently") + + // RouterMapCallRWImpl returns err on raise_luajit_error + _, err = router.RouterMapCallRWImpl(ctx, "raise_luajit_error", noArgs, callOpts) + require.NotNil(t, err, "RouterMapCallRWImpl raise_luajit_error finished with error") + + // RouterMapCallRWImpl invalid usage + _, err = router.RouterMapCallRWImpl(ctx, "echo", nil, callOpts) + require.NotNil(t, err, "RouterMapCallRWImpl with nil args finished with error") + + // Ensure that RouterMapCallRWImpl doesn't work when it mean't to + for rsInfo := range topology { + errs := router.RemoveReplicaset(ctx, rsInfo.Name) + require.Emptyf(t, errs, "%s successfully removed from router", rsInfo.Name) + + break + } + + _, err = router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) + require.NotNilf(t, err, "RouterMapCallRWImpl failed on not full cluster") +} diff --git a/tests/tnt/concurrent_topology_test.go b/tests/tnt/concurrent_topology_test.go index 4115435..582a6ce 100644 --- a/tests/tnt/concurrent_topology_test.go +++ b/tests/tnt/concurrent_topology_test.go @@ -92,7 +92,7 @@ func (c *concurrentTopologyProvider) Init(tc vshardrouter.TopologyController) er removed[key] = added[key] delete(added, key) - _ = tc.RemoveReplicaset(ctx, key.UUID) + _ = tc.RemoveReplicaset(ctx, key.UUID.String()) default: panic("unreachable case") } diff --git a/tests/tnt/routermap_call_test.go b/tests/tnt/routermap_call_test.go deleted file mode 100644 index f005e27..0000000 --- a/tests/tnt/routermap_call_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package tnt - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - vshardrouter "github.com/tarantool/go-vshard-router" - "github.com/tarantool/go-vshard-router/providers/static" -) - -func TestRouterMapCall(t *testing.T) { - skipOnInvalidRun(t) - - t.Parallel() - - ctx := context.Background() - - cfg := getCfg() - - router, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{ - TopologyProvider: static.NewProvider(cfg), - DiscoveryTimeout: 5 * time.Second, - DiscoveryMode: vshardrouter.DiscoveryModeOn, - TotalBucketCount: totalBucketCount, - User: defaultTntUser, - Password: defaultTntPassword, - }) - - require.Nil(t, err, "NewRouter finished successfully") - - callOpts := vshardrouter.CallOpts{} - - const arg = "arg1" - - // Enusre that RouterMapCallRWImpl works at all - echoArgs := []interface{}{arg} - resp, err := router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) - require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") - - for k, v := range resp { - require.Equalf(t, arg, v, "RouterMapCallRWImpl value ok for %v", k) - } - - echoArgs = []interface{}{1} - respInt, err := vshardrouter.RouterMapCallRW[int](router, ctx, "echo", echoArgs, vshardrouter.RouterMapCallRWOptions{}) - require.NoError(t, err, "RouterMapCallRW[int] echo finished with no err") - for k, v := range respInt { - require.Equalf(t, 1, v, "RouterMapCallRW[int] value ok for %v", k) - } - - // RouterMapCallRWImpl returns only one value - echoArgs = []interface{}{arg, "arg2"} - resp, err = router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) - require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") - - for k, v := range resp { - require.Equalf(t, arg, v, "RouterMapCallRWImpl value ok for %v", k) - } - - // RouterMapCallRWImpl returns nil when no return value - noArgs := []interface{}{} - resp, err = router.RouterMapCallRWImpl(ctx, "echo", noArgs, callOpts) - require.NoError(t, err, "RouterMapCallRWImpl echo finished with no err") - - for k, v := range resp { - require.Equalf(t, nil, v, "RouterMapCallRWImpl value ok for %v", k) - } - - // Ensure that RouterMapCallRWImpl sends requests concurrently - const sleepToSec int = 1 - sleepArgs := []interface{}{sleepToSec} - - start := time.Now() - _, err = router.RouterMapCallRWImpl(ctx, "sleep", sleepArgs, vshardrouter.CallOpts{ - Timeout: 2 * time.Second, // because default timeout is 0.5 sec - }) - duration := time.Since(start) - - require.NoError(t, err, "RouterMapCallRWImpl sleep finished with no err") - require.Greater(t, len(cfg), 1, "There are more than one replicasets") - require.Less(t, duration, 1200*time.Millisecond, "Requests were send concurrently") - - // RouterMapCallRWImpl returns err on raise_luajit_error - _, err = router.RouterMapCallRWImpl(ctx, "raise_luajit_error", noArgs, callOpts) - require.NotNil(t, err, "RouterMapCallRWImpl raise_luajit_error finished with error") - - // RouterMapCallRWImpl invalid usage - _, err = router.RouterMapCallRWImpl(ctx, "echo", nil, callOpts) - require.NotNil(t, err, "RouterMapCallRWImpl with nil args finished with error") - - // Ensure that RouterMapCallRWImpl doesn't work when it mean't to - for k := range cfg { - errs := router.RemoveReplicaset(ctx, k.UUID) - require.Emptyf(t, errs, "%s successfully removed from router", k.UUID) - break - } - - _, err = router.RouterMapCallRWImpl(ctx, "echo", echoArgs, callOpts) - require.NotNilf(t, err, "RouterMapCallRWImpl failed on not full cluster") -} diff --git a/topology.go b/topology.go index 9e8870c..4f479fc 100644 --- a/topology.go +++ b/topology.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/tarantool/go-tarantool/v2" "github.com/tarantool/go-tarantool/v2/pool" ) @@ -19,33 +18,33 @@ var ( // This decision is made intentionally because there is no point in providing concurrence safety for this case. // In any case, a caller can use his own external synchronization primitive to handle concurrent access. type TopologyController interface { - AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error - RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error - RemoveInstance(ctx context.Context, rsID uuid.UUID, instanceName string) error + AddInstance(ctx context.Context, rsName string, info InstanceInfo) error + RemoveReplicaset(ctx context.Context, rsName string) []error + RemoveInstance(ctx context.Context, rsName, instanceName string) error AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error } -func (r *Router) getIDToReplicaset() map[uuid.UUID]*Replicaset { - r.idToReplicasetMutex.RLock() - idToReplicasetRef := r.idToReplicaset - r.idToReplicasetMutex.RUnlock() +func (r *Router) getNameToReplicaset() map[string]*Replicaset { + r.nameToReplicasetMutex.RLock() + nameToReplicasetRef := r.nameToReplicaset + r.nameToReplicasetMutex.RUnlock() - return idToReplicasetRef + return nameToReplicasetRef } -func (r *Router) setIDToReplicaset(idToReplicasetNew map[uuid.UUID]*Replicaset) { - r.idToReplicasetMutex.Lock() - r.idToReplicaset = idToReplicasetNew - r.idToReplicasetMutex.Unlock() +func (r *Router) setNameToReplicaset(nameToReplicasetNew map[string]*Replicaset) { + r.nameToReplicasetMutex.Lock() + r.nameToReplicaset = nameToReplicasetNew + r.nameToReplicasetMutex.Unlock() } func (r *Router) Topology() TopologyController { return r } -func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceInfo) error { - r.log().Debugf(ctx, "Trying to add instance %s to router topology in rs %s", info, rsID) +func (r *Router) AddInstance(ctx context.Context, rsName string, info InstanceInfo) error { + r.log().Debugf(ctx, "Trying to add instance %s to router topology in rs %s", info, rsName) err := info.Validate() if err != nil { @@ -62,9 +61,9 @@ func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceI Opts: r.cfg.PoolOpts, } - idToReplicasetRef := r.getIDToReplicaset() + nameToReplicasetRef := r.getNameToReplicaset() - rs := idToReplicasetRef[rsID] + rs := nameToReplicasetRef[rsName] if rs == nil { return ErrReplicasetNotExists } @@ -72,12 +71,12 @@ func (r *Router) AddInstance(ctx context.Context, rsID uuid.UUID, info InstanceI return rs.conn.Add(ctx, instance) } -func (r *Router) RemoveInstance(ctx context.Context, rsID uuid.UUID, instanceName string) error { - r.log().Debugf(ctx, "Trying to remove instance %s from router topology in rs %s", instanceName, rsID) +func (r *Router) RemoveInstance(ctx context.Context, rsName, instanceName string) error { + r.log().Debugf(ctx, "Trying to remove instance %s from router topology in rs %s", instanceName, rsName) - idToReplicasetRef := r.getIDToReplicaset() + nameToReplicasetRef := r.getNameToReplicaset() - rs := idToReplicasetRef[rsID] + rs := nameToReplicasetRef[rsName] if rs == nil { return ErrReplicasetNotExists } @@ -88,24 +87,14 @@ func (r *Router) RemoveInstance(ctx context.Context, rsID uuid.UUID, instanceNam func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error { r.log().Debugf(ctx, "Trying to add replicaset %s to router topology", rsInfo) - idToReplicasetOld := r.getIDToReplicaset() - - // tarantool 3+ configuration does not require uuid - if rsInfo.UUID == uuid.Nil { - // check that such replicaset does not exist - for _, rs := range idToReplicasetOld { - if rs.info.Name == rsInfo.Name { - return ErrReplicasetExists - } - } - - // we just mock this uuid value - rsInfo.UUID = uuid.New() - - r.log().Warnf(ctx, "replicaset %s is assigned uuid %s; this is a temporary need to migrate from uuid to names", rsInfo.Name, rsInfo.UUID) + err := rsInfo.Validate() + if err != nil { + return err } - if _, ok := idToReplicasetOld[rsInfo.UUID]; ok { + nameToReplicasetOld := r.getNameToReplicaset() + + if _, ok := nameToReplicasetOld[rsInfo.Name]; ok { return ErrReplicasetExists } @@ -153,17 +142,17 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta replicaset.conn = conn // Create an entirely new map object - idToReplicasetNew := make(map[uuid.UUID]*Replicaset) - for k, v := range idToReplicasetOld { - idToReplicasetNew[k] = v + nameToReplicasetNew := make(map[string]*Replicaset) + for k, v := range nameToReplicasetOld { + nameToReplicasetNew[k] = v } - idToReplicasetNew[rsInfo.UUID] = replicaset // add when conn is ready + nameToReplicasetNew[rsInfo.Name] = replicaset // add when conn is ready // We could detect concurrent access to the TopologyController interface // by comparing references to r.idToReplicaset and idToReplicasetOld. // But it requires reflection which I prefer to avoid. // See: https://stackoverflow.com/questions/58636694/how-to-know-if-2-go-maps-reference-the-same-data. - r.setIDToReplicaset(idToReplicasetNew) + r.setNameToReplicaset(nameToReplicasetNew) return nil } @@ -183,24 +172,24 @@ func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetI return nil } -func (r *Router) RemoveReplicaset(ctx context.Context, rsID uuid.UUID) []error { - r.log().Debugf(ctx, "Trying to remove replicaset %s from router topology", rsID) +func (r *Router) RemoveReplicaset(ctx context.Context, rsName string) []error { + r.log().Debugf(ctx, "Trying to remove replicaset %s from router topology", rsName) - idToReplicasetOld := r.getIDToReplicaset() + nameToReplicasetOld := r.getNameToReplicaset() - rs := idToReplicasetOld[rsID] + rs := nameToReplicasetOld[rsName] if rs == nil { return []error{ErrReplicasetNotExists} } // Create an entirely new map object - idToReplicasetNew := make(map[uuid.UUID]*Replicaset) - for k, v := range idToReplicasetOld { - idToReplicasetNew[k] = v + nameToReplicasetNew := make(map[string]*Replicaset) + for k, v := range nameToReplicasetOld { + nameToReplicasetNew[k] = v } - delete(idToReplicasetNew, rsID) + delete(nameToReplicasetNew, rsName) - r.setIDToReplicaset(idToReplicasetNew) + r.setNameToReplicaset(nameToReplicasetNew) return rs.conn.CloseGraceful() } diff --git a/topology_test.go b/topology_test.go index f3f17ac..4611068 100644 --- a/topology_test.go +++ b/topology_test.go @@ -21,13 +21,13 @@ func TestController_AddInstance(t *testing.T) { t.Run("no such replicaset", func(t *testing.T) { router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{}, + nameToReplicaset: map[string]*Replicaset{}, cfg: Config{ Loggerf: emptyLogfProvider, }, } - err := router.Topology().AddInstance(ctx, uuid.New(), InstanceInfo{ + err := router.Topology().AddInstance(ctx, uuid.New().String(), InstanceInfo{ Addr: "127.0.0.1:8060", Name: "instance_001", }) @@ -36,13 +36,13 @@ func TestController_AddInstance(t *testing.T) { t.Run("invalid instance info", func(t *testing.T) { router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{}, + nameToReplicaset: map[string]*Replicaset{}, cfg: Config{ Loggerf: emptyLogfProvider, }, } - err := router.Topology().AddInstance(ctx, uuid.New(), InstanceInfo{}) + err := router.Topology().AddInstance(ctx, uuid.New().String(), InstanceInfo{}) require.True(t, errors.Is(err, ErrInvalidInstanceInfo)) }) } @@ -52,13 +52,13 @@ func TestController_RemoveInstance(t *testing.T) { t.Run("no such replicaset", func(t *testing.T) { router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{}, + nameToReplicaset: map[string]*Replicaset{}, cfg: Config{ Loggerf: emptyLogfProvider, }, } - err := router.Topology().RemoveInstance(ctx, uuid.New(), "") + err := router.Topology().RemoveInstance(ctx, uuid.New().String(), "") require.True(t, errors.Is(err, ErrReplicasetNotExists)) }) } @@ -71,8 +71,8 @@ func TestController_RemoveReplicaset(t *testing.T) { mPool.On("CloseGraceful").Return(nil) router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{ - uuidToRemove: {conn: mPool}, + nameToReplicaset: map[string]*Replicaset{ + uuidToRemove.String(): {conn: mPool}, }, cfg: Config{ Loggerf: emptyLogfProvider, @@ -81,12 +81,12 @@ func TestController_RemoveReplicaset(t *testing.T) { t.Run("no such replicaset", func(t *testing.T) { t.Parallel() - errs := router.Topology().RemoveReplicaset(ctx, uuid.New()) + errs := router.Topology().RemoveReplicaset(ctx, uuid.New().String()) require.True(t, errors.Is(errs[0], ErrReplicasetNotExists)) }) t.Run("successfully remove", func(t *testing.T) { t.Parallel() - errs := router.Topology().RemoveReplicaset(ctx, uuidToRemove) + errs := router.Topology().RemoveReplicaset(ctx, uuidToRemove.String()) require.Empty(t, errs) }) } @@ -94,11 +94,11 @@ func TestController_RemoveReplicaset(t *testing.T) { func TestRouter_AddReplicaset_AlreadyExists(t *testing.T) { ctx := context.TODO() - alreadyExistingRsUUID := uuid.New() + alreadyExistingRsName := uuid.New().String() router := Router{ - idToReplicaset: map[uuid.UUID]*Replicaset{ - alreadyExistingRsUUID: {}, + nameToReplicaset: map[string]*Replicaset{ + alreadyExistingRsName: {conn: nil}, }, cfg: Config{ Loggerf: emptyLogfProvider, @@ -106,6 +106,6 @@ func TestRouter_AddReplicaset_AlreadyExists(t *testing.T) { } // Test that such replicaset already exists - err := router.AddReplicaset(ctx, ReplicasetInfo{UUID: alreadyExistingRsUUID}, []InstanceInfo{}) + err := router.AddReplicaset(ctx, ReplicasetInfo{Name: alreadyExistingRsName}, []InstanceInfo{}) require.Equalf(t, ErrReplicasetExists, err, "such replicaset must already exists") } diff --git a/vshard.go b/vshard.go index d90e581..57842a6 100644 --- a/vshard.go +++ b/vshard.go @@ -15,9 +15,14 @@ import ( ) var ( - ErrInvalidConfig = fmt.Errorf("config invalid") + // ErrInvalidConfig is returned when the configuration is invalid. + ErrInvalidConfig = fmt.Errorf("config invalid") + // ErrInvalidInstanceInfo is returned when the instance information is invalid. ErrInvalidInstanceInfo = fmt.Errorf("invalid instance info") - ErrTopologyProvider = fmt.Errorf("got error from topology provider") + // ErrInvalidReplicasetInfo is returned when the replicaset information is invalid. + ErrInvalidReplicasetInfo = fmt.Errorf("invalid replicaset info") + // ErrTopologyProvider is returned when there is an error from the topology provider. + ErrTopologyProvider = fmt.Errorf("got error from topology provider") ) // This data struct is instroduced by https://github.com/tarantool/go-vshard-router/issues/39. @@ -42,15 +47,15 @@ type consistentView struct { type Router struct { cfg Config - // idToReplicasetMutex guards not the map itself, but the variable idToReplicaset. - // idToReplicaset is an immutable object by our convention. + // nameToReplicasetMutex guards not the map itself, but the variable idToReplicaset. + // nameToReplicaset is an immutable object by our convention. // Whenever we add or remove a replicaset, we create a new map object. - // idToReplicaset can be modified only by TopologyController methods. + // nameToReplicaset can be modified only by TopologyController methods. // Assuming that we rarely add or remove some replicaset, // it should be the simplest and most efficient way of handling concurrent access. // Additionally, we can safely iterate over a map because it never changes. - idToReplicasetMutex sync.RWMutex - idToReplicaset map[uuid.UUID]*Replicaset + nameToReplicasetMutex sync.RWMutex + nameToReplicaset map[string]*Replicaset viewMutex sync.RWMutex view *consistentView @@ -220,8 +225,8 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) { } router := &Router{ - cfg: cfg, - idToReplicaset: make(map[uuid.UUID]*Replicaset), + cfg: cfg, + nameToReplicaset: make(map[string]*Replicaset), view: &consistentView{ routeMap: make([]atomic.Pointer[Replicaset], cfg.TotalBucketCount+1), }, @@ -254,10 +259,10 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) { } // BucketSet Set a bucket to a replicaset. -func (r *Router) BucketSet(bucketID uint64, rsID uuid.UUID) (*Replicaset, error) { - idToReplicasetRef := r.getIDToReplicaset() +func (r *Router) BucketSet(bucketID uint64, rsName string) (*Replicaset, error) { + nameToReplicasetRef := r.getNameToReplicaset() - rs := idToReplicasetRef[rsID] + rs := nameToReplicasetRef[rsName] if rs == nil { return nil, newVShardErrorNoRouteToBucket(bucketID) } @@ -399,10 +404,10 @@ func (r *Router) RouterBucketCount() uint64 { // error will result in an immediate return, ensuring that the operation either // succeeds fully or fails fast. func (r *Router) ClusterBootstrap(ctx context.Context, ifNotBootstrapped bool) error { - rssToBootstrap := make([]Replicaset, 0, len(r.idToReplicaset)) + rssToBootstrap := make([]Replicaset, 0, len(r.nameToReplicaset)) var lastErr error - for _, rs := range r.idToReplicaset { + for _, rs := range r.nameToReplicaset { rssToBootstrap = append(rssToBootstrap, *rs) }