Skip to content

Commit

Permalink
Refactor common revision code to pkg
Browse files Browse the repository at this point in the history
Signed-off-by: Allen Ray <[email protected]>
  • Loading branch information
dusk125 committed Jul 18, 2023
1 parent 35628b9 commit b230d3b
Show file tree
Hide file tree
Showing 19 changed files with 479 additions and 528 deletions.
53 changes: 0 additions & 53 deletions etcdutl/snapshot/util.go

This file was deleted.

31 changes: 16 additions & 15 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/snapshot"
"go.etcd.io/etcd/pkg/v3/revision"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
Expand Down Expand Up @@ -157,8 +158,8 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("cannot write to bucket %s", err.Error())
}
if iskeyb {
rev := bytesToRev(k)
ds.Revision = rev.main
rev := revision.BytesToRev(k)
ds.Revision = rev.Main
}
ds.TotalKey++
return nil
Expand Down Expand Up @@ -353,36 +354,36 @@ func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
return nil
}

func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amount int64) revision {
func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision.Revision, amount int64) revision.Revision {
s.lg.Info(
"bumping latest revision",
zap.Int64("latest-revision", latest.main),
zap.Int64("latest-revision", latest.Main),
zap.Int64("bump-amount", amount),
zap.Int64("new-latest-revision", latest.main+amount),
zap.Int64("new-latest-revision", latest.Main+amount),
)

latest.main += amount
latest.sub = 0
k := make([]byte, 17)
revToBytes(k, latest)
latest.Main += amount
latest.Sub = 0
k := revision.NewRevBytes()
revision.RevToBytes(latest, k)
tx.UnsafePut(schema.Key, k, []byte{})

return latest
}

func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revision) {
func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revision.Revision) {
s.lg.Info(
"marking revision compacted",
zap.Int64("revision", latest.main),
zap.Int64("revision", latest.Main),
)

mvcc.UnsafeSetScheduledCompact(tx, latest.main)
mvcc.UnsafeSetScheduledCompact(tx, latest.Main)
}

func (s *v3Manager) unsafeGetLatestRevision(tx backend.BatchTx) (revision, error) {
var latest revision
func (s *v3Manager) unsafeGetLatestRevision(tx backend.BatchTx) (revision.Revision, error) {
var latest revision.Revision
err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) {
rev := bytesToRev(k)
rev := revision.BytesToRev(k)

if rev.GreaterThan(latest) {
latest = rev
Expand Down
62 changes: 62 additions & 0 deletions pkg/revision/revision.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package revision

import "encoding/binary"

// RevBytesLen is the byte length of a normal revision.
// First 8 bytes is the revision.main in big-endian format. The 9th byte
// is a '_'. The last 8 bytes is the revision.sub in big-endian format.
const RevBytesLen = 8 + 1 + 8
const MarkedRevBytesLen = RevBytesLen + 1

// Revision indicates modification of the key-value space.
// The set of changes that share same main Revision changes the key-value space atomically.
type Revision struct {
// Main is the Main revision of a set of changes that happen atomically.
Main int64

// Sub is the Sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing Sub revision in that
// set.
Sub int64
}

func (a Revision) GreaterThan(b Revision) bool {
if a.Main > b.Main {
return true
}
if a.Main < b.Main {
return false
}
return a.Sub > b.Sub
}

func NewRevBytes() []byte {
return make([]byte, RevBytesLen, MarkedRevBytesLen)
}

func RevToBytes(rev Revision, bytes []byte) {
binary.BigEndian.PutUint64(bytes, uint64(rev.Main))
bytes[8] = '_'
binary.BigEndian.PutUint64(bytes[9:], uint64(rev.Sub))
}

func BytesToRev(bytes []byte) Revision {
return Revision{
Main: int64(binary.BigEndian.Uint64(bytes[0:8])),
Sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package mvcc
package revision

import (
"bytes"
Expand All @@ -25,22 +25,22 @@ import (
// bytes slice. Moreover, the lexicographical order of its byte slice representation
// follows the order of (main, sub).
func TestRevision(t *testing.T) {
tests := []revision{
tests := []Revision{
// order in (main, sub)
{},
{main: 1, sub: 0},
{main: 1, sub: 1},
{main: 2, sub: 0},
{main: math.MaxInt64, sub: math.MaxInt64},
{Main: 1, Sub: 0},
{Main: 1, Sub: 1},
{Main: 2, Sub: 0},
{Main: math.MaxInt64, Sub: math.MaxInt64},
}

bs := make([][]byte, len(tests))
for i, tt := range tests {
b := newRevBytes()
revToBytes(tt, b)
b := NewRevBytes()
RevToBytes(tt, b)
bs[i] = b

if grev := bytesToRev(b); !reflect.DeepEqual(grev, tt) {
if grev := BytesToRev(b); !reflect.DeepEqual(grev, tt) {
t.Errorf("#%d: revision = %+v, want %+v", i, grev, tt)
}
}
Expand Down
13 changes: 7 additions & 6 deletions server/storage/mvcc/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.uber.org/zap"

"go.etcd.io/etcd/pkg/v3/revision"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
)
Expand All @@ -30,7 +31,7 @@ const (
hashStorageMaxSize = 10
)

func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision.Revision]struct{}) (KeyValueHash, error) {
h := newKVHasher(compactRevision, revision, keep)
err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
h.WriteKeyValue(k, v)
Expand All @@ -43,10 +44,10 @@ type kvHasher struct {
hash hash.Hash32
compactRevision int64
revision int64
keep map[revision]struct{}
keep map[revision.Revision]struct{}
}

func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
func newKVHasher(compactRev, rev int64, keep map[revision.Revision]struct{}) kvHasher {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(schema.Key.Name())
return kvHasher{
Expand All @@ -58,12 +59,12 @@ func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
}

func (h *kvHasher) WriteKeyValue(k, v []byte) {
kr := bytesToRev(k)
upper := revision{main: h.revision + 1}
kr := revision.BytesToRev(k)
upper := revision.Revision{Main: h.revision + 1}
if !upper.GreaterThan(kr) {
return
}
lower := revision{main: h.compactRevision + 1}
lower := revision.Revision{Main: h.compactRevision + 1}
// skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one.
if lower.GreaterThan(kr) && len(h.keep) > 0 {
Expand Down
Loading

0 comments on commit b230d3b

Please sign in to comment.