Skip to content

Commit

Permalink
Make replicaset name the required identifier, UUID is now optional.
Browse files Browse the repository at this point in the history
- Replaced UUID with Name as the primary required identifier for replicaset.
- Added backward compatibility for VShard errors by searching for both UUID and Name.
- Updated error handling to accommodate new version of VShard that supports name-based identification.
  • Loading branch information
maksim.konovalov authored and KaymeKaydex committed Jan 13, 2025
1 parent 2ea04f8 commit a284589
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 245 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CHANGES:
* VshardRouterCallMode type renamed to CallMode for simplicity.
* StorageResultTypedFunc type removed as useless type.
* Updated msgpack version from v5.3.5 to v5.4.1.
* Replicaset identifier now is replicaset name instead uuid.

TESTS:

Expand All @@ -19,6 +20,7 @@ TESTS:
* Moved TestReplicasetReplicaCall and Go benches from tests/tnt to tarantool_test.go .
* TestRouterCallProto rewrote.
* Start using constants in tarantool_test.go instead duplicate variables.
* TestRouterMapCall moved to tarantool_test.go and renamed to TestRouter_RouterMapCallRWImpl.

## v1.3.2

Expand Down
81 changes: 44 additions & 37 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"time"

"github.com/google/uuid"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/pool"
"github.com/vmihailenco/msgpack/v5"
Expand Down Expand Up @@ -313,27 +311,35 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
// We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
r.BucketReset(bucketID)

if vshardError.Destination != "" {
destinationUUID, err := uuid.Parse(vshardError.Destination)
if err != nil {
return VshardRouterCallResp{}, fmt.Errorf("protocol violation %s: malformed destination %w: %w",
vshardStorageClientCall, vshardError, err)
}

if destination := vshardError.Destination; destination != "" {
var loggedOnce bool
for {
idToReplicasetRef := r.getIDToReplicaset()
if _, ok := idToReplicasetRef[destinationUUID]; ok {
_, err := r.BucketSet(bucketID, destinationUUID)
nameToReplicasetRef := r.getNameToReplicaset()

_, destinationExists := nameToReplicasetRef[destination]

if !destinationExists {
// for older logic with uuid we must support backward compatibility
// if destination is uuid and not name, lets find it too
for _, rsRef := range nameToReplicasetRef {
if rsRef.info.UUID.String() == destination {
destinationExists = true
break
}
}
}

if destinationExists {
_, err := r.BucketSet(bucketID, destination)
if err == nil {
break // breaks loop
}
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationUUID, err)
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destination, err)
}

if !loggedOnce {
r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+
"update configuration", destinationUUID)
"update configuration", destination)
loggedOnce = true
}

Expand Down Expand Up @@ -543,13 +549,14 @@ func (r *Router) RouterMapCallRWImpl(
fnc string,
args interface{},
opts CallOpts,
) (map[uuid.UUID]interface{}, error) {
) (map[string]interface{}, error) {
// nolint:gosimple
return RouterMapCallRW[interface{}](r, ctx, fnc, args, RouterMapCallRWOptions{Timeout: opts.Timeout})
}

type replicasetFuture struct {
uuid uuid.UUID
// replicaset name
name string
future *tarantool.Future
}

Expand All @@ -561,7 +568,7 @@ type replicasetFuture struct {
// see: https://github.com/golang/go/issues/49085.
func RouterMapCallRW[T any](r *Router, ctx context.Context,
fnc string, args interface{}, opts RouterMapCallRWOptions,
) (map[uuid.UUID]T, error) {
) (map[string]T, error) {
const vshardStorageServiceCall = "vshard.storage._call"

timeout := callTimeoutDefault
Expand All @@ -572,14 +579,14 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context,
timeStart := time.Now()
refID := r.refID.Add(1)

idToReplicasetRef := r.getIDToReplicaset()
nameToReplicasetRef := r.getNameToReplicaset()

defer func() {
// call function "storage_unref" if map_callrw is failed or successed
storageUnrefReq := tarantool.NewCallRequest(vshardStorageServiceCall).
Args([]interface{}{"storage_unref", refID})

for _, rs := range idToReplicasetRef {
for _, rs := range nameToReplicasetRef {
future := rs.conn.Do(storageUnrefReq, pool.RW)
future.SetError(nil) // TODO: does it cancel the request above or not?
}
Expand All @@ -594,12 +601,12 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context,
Context(ctx).
Args([]interface{}{"storage_ref", refID, timeout})

var rsFutures = make([]replicasetFuture, 0, len(idToReplicasetRef))
var rsFutures = make([]replicasetFuture, 0, len(nameToReplicasetRef))

// ref stage: send concurrent ref requests
for uuid, rs := range idToReplicasetRef {
for name, rs := range nameToReplicasetRef {
rsFutures = append(rsFutures, replicasetFuture{
uuid: uuid,
name: name,
future: rs.conn.Do(storageRefReq, pool.RW),
})
}
Expand All @@ -612,11 +619,11 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context,
var storageRefResponse storageRefResponseProto

if err := rsFuture.future.GetTyped(&storageRefResponse); err != nil {
return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.uuid, err)
return nil, fmt.Errorf("rs {%s} storage_ref err: %v", rsFuture.name, err)
}

if storageRefResponse.err != nil {
return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.uuid, storageRefResponse.err)
return nil, fmt.Errorf("storage_ref failed on %v: %v", rsFuture.name, storageRefResponse.err)
}

totalBucketCount += storageRefResponse.bucketCount
Expand All @@ -636,33 +643,33 @@ func RouterMapCallRW[T any](r *Router, ctx context.Context,
rsFutures = rsFutures[0:0]

// map stage: send concurrent map requests
for uuid, rs := range idToReplicasetRef {
for name, rs := range nameToReplicasetRef {
rsFutures = append(rsFutures, replicasetFuture{
uuid: uuid,
name: name,
future: rs.conn.Do(storageMapReq, pool.RW),
})
}

// map stage: get their responses
idToResult := make(map[uuid.UUID]T)
nameToResult := make(map[string]T)
for _, rsFuture := range rsFutures {
var storageMapResponse storageMapResponseProto[T]

err := rsFuture.future.GetTyped(&storageMapResponse)
if err != nil {
return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.uuid, err)
return nil, fmt.Errorf("rs {%s} storage_map err: %v", rsFuture.name, err)
}

if !storageMapResponse.ok {
return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.uuid, storageMapResponse.err)
return nil, fmt.Errorf("storage_map failed on %v: %+v", rsFuture.name, storageMapResponse.err)
}

idToResult[rsFuture.uuid] = storageMapResponse.value
nameToResult[rsFuture.name] = storageMapResponse.value
}

r.metrics().RequestDuration(time.Since(timeStart), true, true)

return idToResult, nil
return nameToResult, nil
}

// RouterRoute get replicaset object by bucket identifier.
Expand All @@ -672,15 +679,15 @@ func (r *Router) RouterRoute(ctx context.Context, bucketID uint64) (*Replicaset,
}

// RouterRouteAll return map of all replicasets.
func (r *Router) RouterRouteAll() map[uuid.UUID]*Replicaset {
idToReplicasetRef := r.getIDToReplicaset()
func (r *Router) RouterRouteAll() map[string]*Replicaset {
nameToReplicasetRef := r.getNameToReplicaset()

// Do not expose the original map to prevent unauthorized modification.
idToReplicasetCopy := make(map[uuid.UUID]*Replicaset)
nameToReplicasetCopy := make(map[string]*Replicaset)

for k, v := range idToReplicasetRef {
idToReplicasetCopy[k] = v
for k, v := range nameToReplicasetRef {
nameToReplicasetCopy[k] = v
}

return idToReplicasetCopy
return nameToReplicasetCopy
}
33 changes: 16 additions & 17 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"golang.org/x/sync/errgroup"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool/v2"
)

Expand Down Expand Up @@ -70,18 +69,18 @@ func (r *Router) BucketDiscovery(ctx context.Context, bucketID uint64) (*Replica
}

func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Replicaset, error) {
idToReplicasetRef := r.getIDToReplicaset()
nameToReplicasetRef := r.getNameToReplicaset()

type rsFuture struct {
rsID uuid.UUID
rsName string
future *tarantool.Future
}

var rsFutures = make([]rsFuture, 0, len(idToReplicasetRef))
var rsFutures = make([]rsFuture, 0, len(nameToReplicasetRef))
// Send a bunch of parallel requests
for rsID, rs := range idToReplicasetRef {
for rsName, rs := range nameToReplicasetRef {
rsFutures = append(rsFutures, rsFuture{
rsID: rsID,
rsName: rsName,
future: rs.bucketStatAsync(ctx, bucketID),
})
}
Expand All @@ -90,16 +89,16 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
if _, err := bucketStatWait(rsFuture.future); err != nil {
var vshardError StorageCallVShardError
if !errors.As(err, &vshardError) {
r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsID, err)
r.log().Errorf(ctx, "bucketSearchLegacy: bucketStatWait call error for %v: %v", rsFuture.rsName, err)
}
// just skip, bucket may not belong to this replicaset
continue
}

// It's ok if several replicasets return ok to bucket_stat command for the same bucketID, just pick any of them.
rs, err := r.BucketSet(bucketID, rsFuture.rsID)
rs, err := r.BucketSet(bucketID, rsFuture.rsName)
if err != nil {
r.log().Errorf(ctx, "bucketSearchLegacy: can't set rsID %v for bucketID %d: %v", rsFuture.rsID, bucketID, err)
r.log().Errorf(ctx, "bucketSearchLegacy: can't set rsID %v for bucketID %d: %v", rsFuture.rsName, bucketID, err)
return nil, newVShardErrorNoRouteToBucket(bucketID)
}

Expand Down Expand Up @@ -127,21 +126,21 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L1700
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/consts.lua#L37
func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) (*Replicaset, error) {
idToReplicasetRef := r.getIDToReplicaset()
nameToReplicasetRef := r.getNameToReplicaset()
view := r.getConsistentView()

type rsFuture struct {
rs *Replicaset
rsID uuid.UUID
rsName string
future *tarantool.Future
}

var rsFutures = make([]rsFuture, 0, len(idToReplicasetRef))
var rsFutures = make([]rsFuture, 0, len(nameToReplicasetRef))
// Send a bunch of parallel requests
for rsID, rs := range idToReplicasetRef {
for rsName, rs := range nameToReplicasetRef {
rsFutures = append(rsFutures, rsFuture{
rs: rs,
rsID: rsID,
rsName: rsName,
future: rs.bucketsDiscoveryAsync(ctx, bucketIDToFind),
})
}
Expand All @@ -151,7 +150,7 @@ func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64)
for _, rsFuture := range rsFutures {
resp, err := bucketsDiscoveryWait(rsFuture.future)
if err != nil {
r.log().Errorf(ctx, "bucketSearchBatched: bucketsDiscoveryWait error for %v: %v", rsFuture.rsID, err)
r.log().Errorf(ctx, "bucketSearchBatched: bucketsDiscoveryWait error for %v: %v", rsFuture.rsName, err)
// just skip, we still may find our bucket in another replicaset
continue
}
Expand Down Expand Up @@ -231,9 +230,9 @@ func (r *Router) DiscoveryAllBuckets(ctx context.Context) error {
errGr, ctx := errgroup.WithContext(ctx)

view := r.getConsistentView()
idToReplicasetRef := r.getIDToReplicaset()
nameToReplicasetRef := r.getNameToReplicaset()

for _, rs := range idToReplicasetRef {
for _, rs := range nameToReplicasetRef {
rs := rs

errGr.Go(func() error {
Expand Down
31 changes: 25 additions & 6 deletions replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,35 @@ import (
"github.com/vmihailenco/msgpack/v5/msgpcode"
)

// ReplicasetInfo represents information about a replicaset, including its name, unique identifier, weight, and state.
type ReplicasetInfo struct {
Name string
UUID uuid.UUID
Weight float64
PinnedCount uint64
// Name — the name of the replicaset.
// This string is required and is used to identify the replicaset.
Name string
// UUID — the unique identifier of the replica.
// This is an optional value that can be used to uniquely distinguish each replicaset.
UUID uuid.UUID
// Weight — the weight of the replicaset.
// This floating-point number may be used to determine the importance or priority of the replicaset.
Weight float64
// PinnedCount — the number of pinned items.
// This value indicates how many items or tasks are associated with the replicaset.
PinnedCount uint64
// IgnoreDisbalance — a flag indicating whether to ignore load imbalance when distributing tasks.
// If true, the replicaset will be excluded from imbalance checks.
IgnoreDisbalance bool
}

func (rsi ReplicasetInfo) String() string {
return fmt.Sprintf("{name: %s, uuid: %s}", rsi.Name, rsi.UUID)
func (ri ReplicasetInfo) Validate() error {
if ri.Name == "" {
return fmt.Errorf("%w: rsInfo.Name is empty", ErrInvalidReplicasetInfo)
}

return nil
}

func (ri ReplicasetInfo) String() string {
return fmt.Sprintf("{name: %s, uuid: %s}", ri.Name, ri.UUID)
}

type ReplicasetCallOpts struct {
Expand Down
Loading

0 comments on commit a284589

Please sign in to comment.