Skip to content

Commit

Permalink
Refactor common revision code to pkg
Browse files Browse the repository at this point in the history
Signed-off-by: Allen Ray <[email protected]>
  • Loading branch information
dusk125 committed Sep 28, 2023
1 parent 55253ce commit c0f09b3
Show file tree
Hide file tree
Showing 18 changed files with 468 additions and 560 deletions.
53 changes: 0 additions & 53 deletions etcdutl/snapshot/util.go

This file was deleted.

32 changes: 16 additions & 16 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
return fmt.Errorf("cannot write to bucket %s", herr.Error())
}
if iskeyb {
rev := bytesToRev(k)
ds.Revision = rev.main
rev := mvcc.BytesToRev(k)
ds.Revision = rev.Main
}
ds.TotalKey++
return nil
Expand Down Expand Up @@ -352,38 +352,38 @@ func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
return nil
}

func (s *v3Manager) unsafeBumpRevision(tx backend.UnsafeWriter, latest revision, amount int64) revision {
func (s *v3Manager) unsafeBumpRevision(tx backend.UnsafeWriter, latest mvcc.BucketKey, amount int64) mvcc.BucketKey {
s.lg.Info(
"bumping latest revision",
zap.Int64("latest-revision", latest.main),
zap.Int64("latest-revision", latest.Main),
zap.Int64("bump-amount", amount),
zap.Int64("new-latest-revision", latest.main+amount),
zap.Int64("new-latest-revision", latest.Main+amount),
)

latest.main += amount
latest.sub = 0
k := make([]byte, 17)
revToBytes(k, latest)
latest.Main += amount
latest.Sub = 0
k := mvcc.NewRevBytes()
k = mvcc.RevToBytes(latest, k)
tx.UnsafePut(schema.Key, k, []byte{})

return latest
}

func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest revision) {
func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.UnsafeWriter, latest mvcc.BucketKey) {
s.lg.Info(
"marking revision compacted",
zap.Int64("revision", latest.main),
zap.Int64("revision", latest.Main),
)

mvcc.UnsafeSetScheduledCompact(tx, latest.main)
mvcc.UnsafeSetScheduledCompact(tx, latest.Main)
}

func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (revision, error) {
var latest revision
func (s *v3Manager) unsafeGetLatestRevision(tx backend.UnsafeReader) (mvcc.BucketKey, error) {
var latest mvcc.BucketKey
err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) {
rev := bytesToRev(k)
rev := mvcc.BytesToRev(k)

if rev.GreaterThan(latest) {
if rev.GreaterThan(latest.Revision) {
latest = rev
}

Expand Down
16 changes: 8 additions & 8 deletions server/storage/mvcc/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
hashStorageMaxSize = 10
)

func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[BucketKey]struct{}) (KeyValueHash, error) {
h := newKVHasher(compactRevision, revision, keep)
err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
h.WriteKeyValue(k, v)
Expand All @@ -43,10 +43,10 @@ type kvHasher struct {
hash hash.Hash32
compactRevision int64
revision int64
keep map[revision]struct{}
keep map[BucketKey]struct{}
}

func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
func newKVHasher(compactRev, rev int64, keep map[BucketKey]struct{}) kvHasher {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(schema.Key.Name())
return kvHasher{
Expand All @@ -58,15 +58,15 @@ func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
}

func (h *kvHasher) WriteKeyValue(k, v []byte) {
kr := bytesToRev(k)
upper := revision{main: h.revision + 1}
if !upper.GreaterThan(kr) {
kr := BytesToRev(k)
upper := Revision{Main: h.revision + 1}
if !upper.GreaterThan(kr.Revision) {
return
}
lower := revision{main: h.compactRevision + 1}
lower := Revision{Main: h.compactRevision + 1}
// skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one.
if lower.GreaterThan(kr) && len(h.keep) > 0 {
if lower.GreaterThan(kr.Revision) && len(h.keep) > 0 {
if _, ok := h.keep[kr]; !ok {
return
}
Expand Down
46 changes: 23 additions & 23 deletions server/storage/mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
)

type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Revisions(key, end []byte, atRev int64, limit int) ([]revision, int)
Get(key []byte, atRev int64) (rev, created BucketKey, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []BucketKey)
Revisions(key, end []byte, atRev int64, limit int) ([]BucketKey, int)
CountRevisions(key, end []byte, atRev int64) int
Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error
Compact(rev int64) map[revision]struct{}
Keep(rev int64) map[revision]struct{}
Put(key []byte, rev BucketKey)
Tombstone(key []byte, rev BucketKey) error
Compact(rev int64) map[BucketKey]struct{}
Keep(rev int64) map[BucketKey]struct{}
Equal(b index) bool

Insert(ki *keyIndex)
Expand All @@ -51,30 +51,30 @@ func newTreeIndex(lg *zap.Logger) index {
}
}

func (ti *treeIndex) Put(key []byte, rev revision) {
func (ti *treeIndex) Put(key []byte, rev BucketKey) {
keyi := &keyIndex{key: key}

ti.Lock()
defer ti.Unlock()
okeyi, ok := ti.tree.Get(keyi)
if !ok {
keyi.put(ti.lg, rev.main, rev.sub)
keyi.put(ti.lg, rev.Main, rev.Sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi.put(ti.lg, rev.main, rev.sub)
okeyi.put(ti.lg, rev.Main, rev.Sub)
}

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created BucketKey, ver int64, err error) {
ti.RLock()
defer ti.RUnlock()
return ti.unsafeGet(key, atRev)
}

func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created BucketKey, ver int64, err error) {
keyi := &keyIndex{key: key}
if keyi = ti.keyIndex(keyi); keyi == nil {
return revision{}, revision{}, 0, ErrRevisionNotFound
return BucketKey{}, BucketKey{}, 0, ErrRevisionNotFound
}
return keyi.get(ti.lg, atRev)
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
// Revisions returns limited number of revisions from key(included) to end(excluded)
// at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0.
// The second return parameter isn't capped by the limit and reflects the total number of revisions.
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []BucketKey, total int) {
ti.RLock()
defer ti.RUnlock()

Expand All @@ -118,7 +118,7 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []
if err != nil {
return nil, 0
}
return []revision{rev}, 1
return []BucketKey{rev}, 1
}
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
Expand Down Expand Up @@ -155,7 +155,7 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int {
return total
}

func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []BucketKey) {
ti.RLock()
defer ti.RUnlock()

Expand All @@ -164,7 +164,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
if err != nil {
return nil, nil
}
return [][]byte{key}, []revision{rev}
return [][]byte{key}, []BucketKey{rev}
}
ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
Expand All @@ -176,7 +176,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []
return keys, revs
}

func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
func (ti *treeIndex) Tombstone(key []byte, rev BucketKey) error {
keyi := &keyIndex{key: key}

ti.Lock()
Expand All @@ -186,11 +186,11 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
return ErrRevisionNotFound
}

return ki.tombstone(ti.lg, rev.main, rev.sub)
return ki.tombstone(ti.lg, rev.Main, rev.Sub)
}

func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
func (ti *treeIndex) Compact(rev int64) map[BucketKey]struct{} {
available := make(map[BucketKey]struct{})
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
ti.Lock()
clone := ti.tree.Clone()
Expand All @@ -214,8 +214,8 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
}

// Keep finds all revisions to be kept for a Compaction at the given rev.
func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
func (ti *treeIndex) Keep(rev int64) map[BucketKey]struct{} {
available := make(map[BucketKey]struct{})
ti.RLock()
defer ti.RUnlock()
ti.tree.Ascend(func(keyi *keyIndex) bool {
Expand Down
6 changes: 3 additions & 3 deletions server/storage/mvcc/index_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func benchmarkIndexCompact(b *testing.B, size int) {
bytesN := 64
keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
kvindex.Put(keys[i], newRevisionTombstone(int64(i), int64(i), false))
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
Expand All @@ -49,7 +49,7 @@ func BenchmarkIndexPut(b *testing.B) {
keys := createBytesSlice(bytesN, b.N)
b.ResetTimer()
for i := 1; i < b.N; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
kvindex.Put(keys[i], newRevisionTombstone(int64(i), int64(i), false))
}
}

Expand All @@ -60,7 +60,7 @@ func BenchmarkIndexGet(b *testing.B) {
bytesN := 64
keys := createBytesSlice(bytesN, b.N)
for i := 1; i < b.N; i++ {
kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)})
kvindex.Put(keys[i], newRevisionTombstone(int64(i), int64(i), false))
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
Expand Down
Loading

0 comments on commit c0f09b3

Please sign in to comment.