diff --git a/etcdutl/snapshot/util.go b/etcdutl/snapshot/util.go deleted file mode 100644 index a4f3569c6884..000000000000 --- a/etcdutl/snapshot/util.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package snapshot - -import ( - "encoding/binary" -) - -type revision struct { - main int64 - sub int64 -} - -// GreaterThan should be synced with function in server -// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go -func (a revision) GreaterThan(b revision) bool { - if a.main > b.main { - return true - } - if a.main < b.main { - return false - } - return a.sub > b.sub -} - -// bytesToRev should be synced with function in server -// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go -func bytesToRev(bytes []byte) revision { - return revision{ - main: int64(binary.BigEndian.Uint64(bytes[0:8])), - sub: int64(binary.BigEndian.Uint64(bytes[9:])), - } -} - -// revToBytes should be synced with function in server -// https://github.com/etcd-io/etcd/blob/main/server/storage/mvcc/revision.go -func revToBytes(bytes []byte, rev revision) { - binary.BigEndian.PutUint64(bytes[0:8], uint64(rev.main)) - bytes[8] = '_' - binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub)) -} diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 60580006536b..28a8455a910b 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -34,6 +34,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/snapshot" + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" @@ -157,8 +158,8 @@ func (s *v3Manager) Status(dbPath string) (ds Status, err error) { return fmt.Errorf("cannot write to bucket %s", err.Error()) } if iskeyb { - rev := bytesToRev(k) - ds.Revision = rev.main + rev := revision.BytesToRev(k) + ds.Revision = rev.Main } ds.TotalKey++ return nil @@ -353,36 +354,36 @@ func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error { return nil } -func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision, amount int64) revision { +func (s *v3Manager) unsafeBumpRevision(tx backend.BatchTx, latest revision.Revision, amount int64) revision.Revision { s.lg.Info( "bumping latest revision", - zap.Int64("latest-revision", latest.main), + zap.Int64("latest-revision", latest.Main), zap.Int64("bump-amount", amount), - zap.Int64("new-latest-revision", latest.main+amount), + zap.Int64("new-latest-revision", latest.Main+amount), ) - latest.main += amount - latest.sub = 0 - k := make([]byte, 17) - revToBytes(k, latest) + latest.Main += amount + latest.Sub = 0 + k := revision.NewRevBytes() + revision.RevToBytes(latest, k) tx.UnsafePut(schema.Key, k, []byte{}) return latest } -func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revision) { +func (s *v3Manager) unsafeMarkRevisionCompacted(tx backend.BatchTx, latest revision.Revision) { s.lg.Info( "marking revision compacted", - zap.Int64("revision", latest.main), + zap.Int64("revision", latest.Main), ) - mvcc.UnsafeSetScheduledCompact(tx, latest.main) + mvcc.UnsafeSetScheduledCompact(tx, latest.Main) } -func (s *v3Manager) unsafeGetLatestRevision(tx backend.BatchTx) (revision, error) { - var latest revision +func (s *v3Manager) unsafeGetLatestRevision(tx backend.BatchTx) (revision.Revision, error) { + var latest revision.Revision err := tx.UnsafeForEach(schema.Key, func(k, _ []byte) (err error) { - rev := bytesToRev(k) + rev := revision.BytesToRev(k) if rev.GreaterThan(latest) { latest = rev diff --git a/pkg/revision/revision.go b/pkg/revision/revision.go new file mode 100644 index 000000000000..74db6d1e7e8a --- /dev/null +++ b/pkg/revision/revision.go @@ -0,0 +1,62 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package revision + +import "encoding/binary" + +// RevBytesLen is the byte length of a normal revision. +// First 8 bytes is the revision.main in big-endian format. The 9th byte +// is a '_'. The last 8 bytes is the revision.sub in big-endian format. +const RevBytesLen = 8 + 1 + 8 +const MarkedRevBytesLen = RevBytesLen + 1 + +// Revision indicates modification of the key-value space. +// The set of changes that share same main Revision changes the key-value space atomically. +type Revision struct { + // Main is the Main revision of a set of changes that happen atomically. + Main int64 + + // Sub is the Sub revision of a change in a set of changes that happen + // atomically. Each change has different increasing Sub revision in that + // set. + Sub int64 +} + +func (a Revision) GreaterThan(b Revision) bool { + if a.Main > b.Main { + return true + } + if a.Main < b.Main { + return false + } + return a.Sub > b.Sub +} + +func NewRevBytes() []byte { + return make([]byte, RevBytesLen, MarkedRevBytesLen) +} + +func RevToBytes(rev Revision, bytes []byte) { + binary.BigEndian.PutUint64(bytes, uint64(rev.Main)) + bytes[8] = '_' + binary.BigEndian.PutUint64(bytes[9:], uint64(rev.Sub)) +} + +func BytesToRev(bytes []byte) Revision { + return Revision{ + Main: int64(binary.BigEndian.Uint64(bytes[0:8])), + Sub: int64(binary.BigEndian.Uint64(bytes[9:])), + } +} diff --git a/server/storage/mvcc/revision_test.go b/pkg/revision/revision_test.go similarity index 83% rename from server/storage/mvcc/revision_test.go rename to pkg/revision/revision_test.go index 46fcb483cf01..08e20f91c8a2 100644 --- a/server/storage/mvcc/revision_test.go +++ b/pkg/revision/revision_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package mvcc +package revision import ( "bytes" @@ -25,22 +25,22 @@ import ( // bytes slice. Moreover, the lexicographical order of its byte slice representation // follows the order of (main, sub). func TestRevision(t *testing.T) { - tests := []revision{ + tests := []Revision{ // order in (main, sub) {}, - {main: 1, sub: 0}, - {main: 1, sub: 1}, - {main: 2, sub: 0}, - {main: math.MaxInt64, sub: math.MaxInt64}, + {Main: 1, Sub: 0}, + {Main: 1, Sub: 1}, + {Main: 2, Sub: 0}, + {Main: math.MaxInt64, Sub: math.MaxInt64}, } bs := make([][]byte, len(tests)) for i, tt := range tests { - b := newRevBytes() - revToBytes(tt, b) + b := NewRevBytes() + RevToBytes(tt, b) bs[i] = b - if grev := bytesToRev(b); !reflect.DeepEqual(grev, tt) { + if grev := BytesToRev(b); !reflect.DeepEqual(grev, tt) { t.Errorf("#%d: revision = %+v, want %+v", i, grev, tt) } } diff --git a/server/storage/mvcc/hash.go b/server/storage/mvcc/hash.go index 385d0c97966f..1b2bf2c0567e 100644 --- a/server/storage/mvcc/hash.go +++ b/server/storage/mvcc/hash.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" ) @@ -30,7 +31,7 @@ const ( hashStorageMaxSize = 10 ) -func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) { +func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision.Revision]struct{}) (KeyValueHash, error) { h := newKVHasher(compactRevision, revision, keep) err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error { h.WriteKeyValue(k, v) @@ -43,10 +44,10 @@ type kvHasher struct { hash hash.Hash32 compactRevision int64 revision int64 - keep map[revision]struct{} + keep map[revision.Revision]struct{} } -func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher { +func newKVHasher(compactRev, rev int64, keep map[revision.Revision]struct{}) kvHasher { h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) h.Write(schema.Key.Name()) return kvHasher{ @@ -58,12 +59,12 @@ func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher { } func (h *kvHasher) WriteKeyValue(k, v []byte) { - kr := bytesToRev(k) - upper := revision{main: h.revision + 1} + kr := revision.BytesToRev(k) + upper := revision.Revision{Main: h.revision + 1} if !upper.GreaterThan(kr) { return } - lower := revision{main: h.compactRevision + 1} + lower := revision.Revision{Main: h.compactRevision + 1} // skip revisions that are scheduled for deletion // due to compacting; don't skip if there isn't one. if lower.GreaterThan(kr) && len(h.keep) > 0 { diff --git a/server/storage/mvcc/index.go b/server/storage/mvcc/index.go index 6f92a4aeeea5..aea5ce2fd6aa 100644 --- a/server/storage/mvcc/index.go +++ b/server/storage/mvcc/index.go @@ -18,18 +18,19 @@ import ( "sync" "github.com/google/btree" + "go.etcd.io/etcd/pkg/v3/revision" "go.uber.org/zap" ) type index interface { - Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) - Range(key, end []byte, atRev int64) ([][]byte, []revision) - Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) + Get(key []byte, atRev int64) (rev, created revision.Revision, ver int64, err error) + Range(key, end []byte, atRev int64) ([][]byte, []revision.Revision) + Revisions(key, end []byte, atRev int64, limit int) ([]revision.Revision, int) CountRevisions(key, end []byte, atRev int64) int - Put(key []byte, rev revision) - Tombstone(key []byte, rev revision) error - Compact(rev int64) map[revision]struct{} - Keep(rev int64) map[revision]struct{} + Put(key []byte, rev revision.Revision) + Tombstone(key []byte, rev revision.Revision) error + Compact(rev int64) map[revision.Revision]struct{} + Keep(rev int64) map[revision.Revision]struct{} Equal(b index) bool Insert(ki *keyIndex) @@ -51,30 +52,30 @@ func newTreeIndex(lg *zap.Logger) index { } } -func (ti *treeIndex) Put(key []byte, rev revision) { +func (ti *treeIndex) Put(key []byte, rev revision.Revision) { keyi := &keyIndex{key: key} ti.Lock() defer ti.Unlock() okeyi, ok := ti.tree.Get(keyi) if !ok { - keyi.put(ti.lg, rev.main, rev.sub) + keyi.put(ti.lg, rev.Main, rev.Sub) ti.tree.ReplaceOrInsert(keyi) return } - okeyi.put(ti.lg, rev.main, rev.sub) + okeyi.put(ti.lg, rev.Main, rev.Sub) } -func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) { +func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision.Revision, ver int64, err error) { ti.RLock() defer ti.RUnlock() return ti.unsafeGet(key, atRev) } -func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) { +func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision.Revision, ver int64, err error) { keyi := &keyIndex{key: key} if keyi = ti.keyIndex(keyi); keyi == nil { - return revision{}, revision{}, 0, ErrRevisionNotFound + return revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound } return keyi.get(ti.lg, atRev) } @@ -109,7 +110,7 @@ func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) { // Revisions returns limited number of revisions from key(included) to end(excluded) // at the given rev. The returned slice is sorted in the order of key. There is no limit if limit <= 0. // The second return parameter isn't capped by the limit and reflects the total number of revisions. -func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) { +func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision.Revision, total int) { ti.RLock() defer ti.RUnlock() @@ -118,7 +119,7 @@ func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs [] if err != nil { return nil, 0 } - return []revision{rev}, 1 + return []revision.Revision{rev}, 1 } ti.unsafeVisit(key, end, func(ki *keyIndex) bool { if rev, _, _, err := ki.get(ti.lg, atRev); err == nil { @@ -155,7 +156,7 @@ func (ti *treeIndex) CountRevisions(key, end []byte, atRev int64) int { return total } -func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) { +func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision.Revision) { ti.RLock() defer ti.RUnlock() @@ -164,7 +165,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs [] if err != nil { return nil, nil } - return [][]byte{key}, []revision{rev} + return [][]byte{key}, []revision.Revision{rev} } ti.unsafeVisit(key, end, func(ki *keyIndex) bool { if rev, _, _, err := ki.get(ti.lg, atRev); err == nil { @@ -176,7 +177,7 @@ func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs [] return keys, revs } -func (ti *treeIndex) Tombstone(key []byte, rev revision) error { +func (ti *treeIndex) Tombstone(key []byte, rev revision.Revision) error { keyi := &keyIndex{key: key} ti.Lock() @@ -186,11 +187,11 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error { return ErrRevisionNotFound } - return ki.tombstone(ti.lg, rev.main, rev.sub) + return ki.tombstone(ti.lg, rev.Main, rev.Sub) } -func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { - available := make(map[revision]struct{}) +func (ti *treeIndex) Compact(rev int64) map[revision.Revision]struct{} { + available := make(map[revision.Revision]struct{}) ti.lg.Info("compact tree index", zap.Int64("revision", rev)) ti.Lock() clone := ti.tree.Clone() @@ -214,8 +215,8 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { } // Keep finds all revisions to be kept for a Compaction at the given rev. -func (ti *treeIndex) Keep(rev int64) map[revision]struct{} { - available := make(map[revision]struct{}) +func (ti *treeIndex) Keep(rev int64) map[revision.Revision]struct{} { + available := make(map[revision.Revision]struct{}) ti.RLock() defer ti.RUnlock() ti.tree.Ascend(func(keyi *keyIndex) bool { diff --git a/server/storage/mvcc/index_bench_test.go b/server/storage/mvcc/index_bench_test.go index 008a7d2ae912..c837e884c08a 100644 --- a/server/storage/mvcc/index_bench_test.go +++ b/server/storage/mvcc/index_bench_test.go @@ -17,6 +17,7 @@ package mvcc import ( "testing" + "go.etcd.io/etcd/pkg/v3/revision" "go.uber.org/zap" ) @@ -33,7 +34,7 @@ func benchmarkIndexCompact(b *testing.B, size int) { bytesN := 64 keys := createBytesSlice(bytesN, size) for i := 1; i < size; i++ { - kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)}) + kvindex.Put(keys[i], revision.Revision{Main: int64(i), Sub: int64(i)}) } b.ResetTimer() for i := 1; i < b.N; i++ { @@ -49,7 +50,7 @@ func BenchmarkIndexPut(b *testing.B) { keys := createBytesSlice(bytesN, b.N) b.ResetTimer() for i := 1; i < b.N; i++ { - kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)}) + kvindex.Put(keys[i], revision.Revision{Main: int64(i), Sub: int64(i)}) } } @@ -60,7 +61,7 @@ func BenchmarkIndexGet(b *testing.B) { bytesN := 64 keys := createBytesSlice(bytesN, b.N) for i := 1; i < b.N; i++ { - kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)}) + kvindex.Put(keys[i], revision.Revision{Main: int64(i), Sub: int64(i)}) } b.ResetTimer() for i := 1; i < b.N; i++ { diff --git a/server/storage/mvcc/index_test.go b/server/storage/mvcc/index_test.go index 7d947670d765..3a24fb1b1e65 100644 --- a/server/storage/mvcc/index_test.go +++ b/server/storage/mvcc/index_test.go @@ -19,30 +19,31 @@ import ( "testing" "github.com/google/btree" + "go.etcd.io/etcd/pkg/v3/revision" "go.uber.org/zap/zaptest" ) func TestIndexGet(t *testing.T) { ti := newTreeIndex(zaptest.NewLogger(t)) - ti.Put([]byte("foo"), revision{main: 2}) - ti.Put([]byte("foo"), revision{main: 4}) - ti.Tombstone([]byte("foo"), revision{main: 6}) + ti.Put([]byte("foo"), revision.Revision{Main: 2}) + ti.Put([]byte("foo"), revision.Revision{Main: 4}) + ti.Tombstone([]byte("foo"), revision.Revision{Main: 6}) tests := []struct { rev int64 - wrev revision - wcreated revision + wrev revision.Revision + wcreated revision.Revision wver int64 werr error }{ - {0, revision{}, revision{}, 0, ErrRevisionNotFound}, - {1, revision{}, revision{}, 0, ErrRevisionNotFound}, - {2, revision{main: 2}, revision{main: 2}, 1, nil}, - {3, revision{main: 2}, revision{main: 2}, 1, nil}, - {4, revision{main: 4}, revision{main: 2}, 2, nil}, - {5, revision{main: 4}, revision{main: 2}, 2, nil}, - {6, revision{}, revision{}, 0, ErrRevisionNotFound}, + {0, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, + {1, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, + {2, revision.Revision{Main: 2}, revision.Revision{Main: 2}, 1, nil}, + {3, revision.Revision{Main: 2}, revision.Revision{Main: 2}, 1, nil}, + {4, revision.Revision{Main: 4}, revision.Revision{Main: 2}, 2, nil}, + {5, revision.Revision{Main: 4}, revision.Revision{Main: 2}, 2, nil}, + {6, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, } for i, tt := range tests { rev, created, ver, err := ti.Get([]byte("foo"), tt.rev) @@ -63,7 +64,7 @@ func TestIndexGet(t *testing.T) { func TestIndexRange(t *testing.T) { allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")} - allRevs := []revision{{main: 1}, {main: 2}, {main: 3}} + allRevs := []revision.Revision{{Main: 1}, {Main: 2}, {Main: 3}} ti := newTreeIndex(zaptest.NewLogger(t)) for i := range allKeys { @@ -74,7 +75,7 @@ func TestIndexRange(t *testing.T) { tests := []struct { key, end []byte wkeys [][]byte - wrevs []revision + wrevs []revision.Revision }{ // single key that not found { @@ -122,9 +123,9 @@ func TestIndexRange(t *testing.T) { func TestIndexTombstone(t *testing.T) { ti := newTreeIndex(zaptest.NewLogger(t)) - ti.Put([]byte("foo"), revision{main: 1}) + ti.Put([]byte("foo"), revision.Revision{Main: 1}) - err := ti.Tombstone([]byte("foo"), revision{main: 2}) + err := ti.Tombstone([]byte("foo"), revision.Revision{Main: 2}) if err != nil { t.Errorf("tombstone error = %v, want nil", err) } @@ -133,7 +134,7 @@ func TestIndexTombstone(t *testing.T) { if err != ErrRevisionNotFound { t.Errorf("get error = %v, want ErrRevisionNotFound", err) } - err = ti.Tombstone([]byte("foo"), revision{main: 3}) + err = ti.Tombstone([]byte("foo"), revision.Revision{Main: 3}) if err != ErrRevisionNotFound { t.Errorf("tombstone error = %v, want %v", err, ErrRevisionNotFound) } @@ -141,7 +142,7 @@ func TestIndexTombstone(t *testing.T) { func TestIndexRevision(t *testing.T) { allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")} - allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}} + allRevs := []revision.Revision{{Main: 1}, {Main: 2}, {Main: 3}, {Main: 4}, {Main: 5}, {Main: 6}} ti := newTreeIndex(zaptest.NewLogger(t)) for i := range allKeys { @@ -152,7 +153,7 @@ func TestIndexRevision(t *testing.T) { key, end []byte atRev int64 limit int - wrevs []revision + wrevs []revision.Revision wcounts int }{ // single key that not found @@ -161,23 +162,23 @@ func TestIndexRevision(t *testing.T) { }, // single key that found { - []byte("foo"), nil, 6, 0, []revision{{main: 6}}, 1, + []byte("foo"), nil, 6, 0, []revision.Revision{{Main: 6}}, 1, }, // various range keys, fixed atRev, unlimited { - []byte("foo"), []byte("foo1"), 6, 0, []revision{{main: 6}}, 1, + []byte("foo"), []byte("foo1"), 6, 0, []revision.Revision{{Main: 6}}, 1, }, { - []byte("foo"), []byte("foo2"), 6, 0, []revision{{main: 6}, {main: 5}}, 2, + []byte("foo"), []byte("foo2"), 6, 0, []revision.Revision{{Main: 6}, {Main: 5}}, 2, }, { - []byte("foo"), []byte("fop"), 6, 0, []revision{{main: 6}, {main: 5}, {main: 4}}, 3, + []byte("foo"), []byte("fop"), 6, 0, []revision.Revision{{Main: 6}, {Main: 5}, {Main: 4}}, 3, }, { - []byte("foo1"), []byte("fop"), 6, 0, []revision{{main: 5}, {main: 4}}, 2, + []byte("foo1"), []byte("fop"), 6, 0, []revision.Revision{{Main: 5}, {Main: 4}}, 2, }, { - []byte("foo2"), []byte("fop"), 6, 0, []revision{{main: 4}}, 1, + []byte("foo2"), []byte("fop"), 6, 0, []revision.Revision{{Main: 4}}, 1, }, { []byte("foo3"), []byte("fop"), 6, 0, nil, 0, @@ -187,38 +188,38 @@ func TestIndexRevision(t *testing.T) { []byte("foo1"), []byte("fop"), 1, 0, nil, 0, }, { - []byte("foo1"), []byte("fop"), 2, 0, []revision{{main: 2}}, 1, + []byte("foo1"), []byte("fop"), 2, 0, []revision.Revision{{Main: 2}}, 1, }, { - []byte("foo1"), []byte("fop"), 3, 0, []revision{{main: 2}, {main: 3}}, 2, + []byte("foo1"), []byte("fop"), 3, 0, []revision.Revision{{Main: 2}, {Main: 3}}, 2, }, { - []byte("foo1"), []byte("fop"), 4, 0, []revision{{main: 2}, {main: 4}}, 2, + []byte("foo1"), []byte("fop"), 4, 0, []revision.Revision{{Main: 2}, {Main: 4}}, 2, }, { - []byte("foo1"), []byte("fop"), 5, 0, []revision{{main: 5}, {main: 4}}, 2, + []byte("foo1"), []byte("fop"), 5, 0, []revision.Revision{{Main: 5}, {Main: 4}}, 2, }, { - []byte("foo1"), []byte("fop"), 6, 0, []revision{{main: 5}, {main: 4}}, 2, + []byte("foo1"), []byte("fop"), 6, 0, []revision.Revision{{Main: 5}, {Main: 4}}, 2, }, // fixed range keys, fixed atRev, various limit { - []byte("foo"), []byte("fop"), 6, 1, []revision{{main: 6}}, 3, + []byte("foo"), []byte("fop"), 6, 1, []revision.Revision{{Main: 6}}, 3, }, { - []byte("foo"), []byte("fop"), 6, 2, []revision{{main: 6}, {main: 5}}, 3, + []byte("foo"), []byte("fop"), 6, 2, []revision.Revision{{Main: 6}, {Main: 5}}, 3, }, { - []byte("foo"), []byte("fop"), 6, 3, []revision{{main: 6}, {main: 5}, {main: 4}}, 3, + []byte("foo"), []byte("fop"), 6, 3, []revision.Revision{{Main: 6}, {Main: 5}, {Main: 4}}, 3, }, { - []byte("foo"), []byte("fop"), 3, 1, []revision{{main: 1}}, 3, + []byte("foo"), []byte("fop"), 3, 1, []revision.Revision{{Main: 1}}, 3, }, { - []byte("foo"), []byte("fop"), 3, 2, []revision{{main: 1}, {main: 2}}, 3, + []byte("foo"), []byte("fop"), 3, 2, []revision.Revision{{Main: 1}, {Main: 2}}, 3, }, { - []byte("foo"), []byte("fop"), 3, 3, []revision{{main: 1}, {main: 2}, {main: 3}}, 3, + []byte("foo"), []byte("fop"), 3, 3, []revision.Revision{{Main: 1}, {Main: 2}, {Main: 3}}, 3, }, } for i, tt := range tests { @@ -238,21 +239,21 @@ func TestIndexCompactAndKeep(t *testing.T) { tests := []struct { key []byte remove bool - rev revision - created revision + rev revision.Revision + created revision.Revision ver int64 }{ - {[]byte("foo"), false, revision{main: 1}, revision{main: 1}, 1}, - {[]byte("foo1"), false, revision{main: 2}, revision{main: 2}, 1}, - {[]byte("foo2"), false, revision{main: 3}, revision{main: 3}, 1}, - {[]byte("foo2"), false, revision{main: 4}, revision{main: 3}, 2}, - {[]byte("foo"), false, revision{main: 5}, revision{main: 1}, 2}, - {[]byte("foo1"), false, revision{main: 6}, revision{main: 2}, 2}, - {[]byte("foo1"), true, revision{main: 7}, revision{}, 0}, - {[]byte("foo2"), true, revision{main: 8}, revision{}, 0}, - {[]byte("foo"), true, revision{main: 9}, revision{}, 0}, - {[]byte("foo"), false, revision{10, 0}, revision{10, 0}, 1}, - {[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1}, + {[]byte("foo"), false, revision.Revision{Main: 1}, revision.Revision{Main: 1}, 1}, + {[]byte("foo1"), false, revision.Revision{Main: 2}, revision.Revision{Main: 2}, 1}, + {[]byte("foo2"), false, revision.Revision{Main: 3}, revision.Revision{Main: 3}, 1}, + {[]byte("foo2"), false, revision.Revision{Main: 4}, revision.Revision{Main: 3}, 2}, + {[]byte("foo"), false, revision.Revision{Main: 5}, revision.Revision{Main: 1}, 2}, + {[]byte("foo1"), false, revision.Revision{Main: 6}, revision.Revision{Main: 2}, 2}, + {[]byte("foo1"), true, revision.Revision{Main: 7}, revision.Revision{}, 0}, + {[]byte("foo2"), true, revision.Revision{Main: 8}, revision.Revision{}, 0}, + {[]byte("foo"), true, revision.Revision{Main: 9}, revision.Revision{}, 0}, + {[]byte("foo"), false, revision.Revision{Main: 10, Sub: 0}, revision.Revision{Main: 10, Sub: 0}, 1}, + {[]byte("foo1"), false, revision.Revision{Main: 10, Sub: 1}, revision.Revision{Main: 10, Sub: 1}, 1}, } // Continuous Compact and Keep @@ -274,7 +275,7 @@ func TestIndexCompactAndKeep(t *testing.T) { return aki.Less(bki) })} for _, tt := range tests { - if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { + if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision.Revision{Main: i}) { if tt.remove { wti.Tombstone(tt.key, tt.rev) } else { @@ -306,7 +307,7 @@ func TestIndexCompactAndKeep(t *testing.T) { return aki.Less(bki) })} for _, tt := range tests { - if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { + if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision.Revision{Main: i}) { if tt.remove { wti.Tombstone(tt.key, tt.rev) } else { @@ -320,7 +321,7 @@ func TestIndexCompactAndKeep(t *testing.T) { } } -func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) { +func restore(ti *treeIndex, key []byte, created, modified revision.Revision, ver int64) { keyi := &keyIndex{key: key} ti.Lock() @@ -331,5 +332,5 @@ func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) { ti.tree.ReplaceOrInsert(keyi) return } - okeyi.put(ti.lg, modified.main, modified.sub) + okeyi.put(ti.lg, modified.Main, modified.Sub) } diff --git a/server/storage/mvcc/key_index.go b/server/storage/mvcc/key_index.go index e7aac273c9e1..db10764466b1 100644 --- a/server/storage/mvcc/key_index.go +++ b/server/storage/mvcc/key_index.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" + "go.etcd.io/etcd/pkg/v3/revision" "go.uber.org/zap" ) @@ -73,21 +74,21 @@ var ( // {empty} -> key SHOULD be removed. type keyIndex struct { key []byte - modified revision // the main rev of the last modification + modified revision.Revision // the main rev of the last modification generations []generation } // put puts a revision to the keyIndex. func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) { - rev := revision{main: main, sub: sub} + rev := revision.Revision{Main: main, Sub: sub} if !rev.GreaterThan(ki.modified) { lg.Panic( "'put' with an unexpected smaller revision", - zap.Int64("given-revision-main", rev.main), - zap.Int64("given-revision-sub", rev.sub), - zap.Int64("modified-revision-main", ki.modified.main), - zap.Int64("modified-revision-sub", ki.modified.sub), + zap.Int64("given-revision-main", rev.Main), + zap.Int64("given-revision-sub", rev.Sub), + zap.Int64("modified-revision-main", ki.modified.Main), + zap.Int64("modified-revision-sub", ki.modified.Sub), ) } if len(ki.generations) == 0 { @@ -103,7 +104,7 @@ func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) { ki.modified = rev } -func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) { +func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision.Revision, ver int64) { if len(ki.generations) != 0 { lg.Panic( "'restore' got an unexpected non-empty generations", @@ -112,7 +113,7 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int6 } ki.modified = modified - g := generation{created: created, ver: ver, revs: []revision{modified}} + g := generation{created: created, ver: ver, revs: []revision.Revision{modified}} ki.generations = append(ki.generations, g) keysGauge.Inc() } @@ -138,7 +139,7 @@ func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error { // get gets the modified, created revision and version of the key that satisfies the given atRev. // Rev must be smaller than or equal to the given atRev. -func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { +func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision.Revision, ver int64, err error) { if ki.isEmpty() { lg.Panic( "'get' got an unexpected empty keyIndex", @@ -147,28 +148,28 @@ func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision } g := ki.findGeneration(atRev) if g.isEmpty() { - return revision{}, revision{}, 0, ErrRevisionNotFound + return revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound } - n := g.walk(func(rev revision) bool { return rev.main > atRev }) + n := g.walk(func(rev revision.Revision) bool { return rev.Main > atRev }) if n != -1 { return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil } - return revision{}, revision{}, 0, ErrRevisionNotFound + return revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound } // since returns revisions since the given rev. Only the revision with the // largest sub revision will be returned if multiple revisions have the same // main revision. -func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { +func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision.Revision { if ki.isEmpty() { lg.Panic( "'since' got an unexpected empty keyIndex", zap.String("key", string(ki.key)), ) } - since := revision{rev, 0} + since := revision.Revision{Main: rev, Sub: 0} var gi int // find the generations to start checking for gi = len(ki.generations) - 1; gi > 0; gi-- { @@ -181,21 +182,21 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { } } - var revs []revision + var revs []revision.Revision var last int64 for ; gi < len(ki.generations); gi++ { for _, r := range ki.generations[gi].revs { if since.GreaterThan(r) { continue } - if r.main == last { + if r.Main == last { // replace the revision with a new one that has higher sub value, // because the original one should not be seen by external revs[len(revs)-1] = r continue } revs = append(revs, r) - last = r.main + last = r.Main } } return revs @@ -205,7 +206,7 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { // revision than the given atRev except the largest one (If the largest one is // a tombstone, it will not be kept). // If a generation becomes empty during compaction, it will be removed. -func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) { +func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision.Revision]struct{}) { if ki.isEmpty() { lg.Panic( "'compact' got an unexpected empty keyIndex", @@ -233,7 +234,7 @@ func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision] } // keep finds the revision to be kept if compact is called at given atRev. -func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) { +func (ki *keyIndex) keep(atRev int64, available map[revision.Revision]struct{}) { if ki.isEmpty() { return } @@ -248,11 +249,11 @@ func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) { } } -func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) { +func (ki *keyIndex) doCompact(atRev int64, available map[revision.Revision]struct{}) (genIdx int, revIndex int) { // walk until reaching the first revision smaller or equal to "atRev", // and add the revision to the available map - f := func(rev revision) bool { - if rev.main <= atRev { + f := func(rev revision.Revision) bool { + if rev.Main <= atRev { available[rev] = struct{}{} return false } @@ -262,7 +263,7 @@ func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (gen genIdx, g := 0, &ki.generations[0] // find first generation includes atRev or created after atRev for genIdx < len(ki.generations)-1 { - if tomb := g.revs[len(g.revs)-1].main; tomb > atRev { + if tomb := g.revs[len(g.revs)-1].Main; tomb > atRev { break } genIdx++ @@ -292,11 +293,11 @@ func (ki *keyIndex) findGeneration(rev int64) *generation { } g := ki.generations[cg] if cg != lastg { - if tomb := g.revs[len(g.revs)-1].main; tomb <= rev { + if tomb := g.revs[len(g.revs)-1].Main; tomb <= rev { return nil } } - if g.revs[0].main <= rev { + if g.revs[0].Main <= rev { return &ki.generations[cg] } cg-- @@ -338,8 +339,8 @@ func (ki *keyIndex) String() string { // generation contains multiple revisions of a key. type generation struct { ver int64 - created revision // when the generation is created (put in first revision). - revs []revision + created revision.Revision // when the generation is created (put in first revision). + revs []revision.Revision } func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 } @@ -349,7 +350,7 @@ func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 } // walk returns until: 1. it finishes walking all pairs 2. the function returns false. // walk returns the position at where it stopped. If it stopped after // finishing walking, -1 will be returned. -func (g *generation) walk(f func(rev revision) bool) int { +func (g *generation) walk(f func(rev revision.Revision) bool) int { l := len(g.revs) for i := range g.revs { ok := f(g.revs[l-i-1]) diff --git a/server/storage/mvcc/key_index_test.go b/server/storage/mvcc/key_index_test.go index bb47d5f1e893..2d170c301e92 100644 --- a/server/storage/mvcc/key_index_test.go +++ b/server/storage/mvcc/key_index_test.go @@ -18,6 +18,7 @@ import ( "reflect" "testing" + "go.etcd.io/etcd/pkg/v3/revision" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -31,43 +32,43 @@ func TestKeyIndexGet(t *testing.T) { // {{8, 0}[1], {10, 0}[2], {12, 0}(t)[3]} // {{2, 0}[1], {4, 0}[2], {6, 0}(t)[3]} ki := newTestKeyIndex(zaptest.NewLogger(t)) - ki.compact(zaptest.NewLogger(t), 4, make(map[revision]struct{})) + ki.compact(zaptest.NewLogger(t), 4, make(map[revision.Revision]struct{})) tests := []struct { rev int64 - wmod revision - wcreat revision + wmod revision.Revision + wcreat revision.Revision wver int64 werr error }{ - {17, revision{}, revision{}, 0, ErrRevisionNotFound}, - {16, revision{}, revision{}, 0, ErrRevisionNotFound}, + {17, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, + {16, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, // get on generation 3 - {15, revision{14, 1}, revision{14, 0}, 2, nil}, - {14, revision{14, 1}, revision{14, 0}, 2, nil}, + {15, revision.Revision{Main: 14, Sub: 1}, revision.Revision{Main: 14, Sub: 0}, 2, nil}, + {14, revision.Revision{Main: 14, Sub: 1}, revision.Revision{Main: 14, Sub: 0}, 2, nil}, - {13, revision{}, revision{}, 0, ErrRevisionNotFound}, - {12, revision{}, revision{}, 0, ErrRevisionNotFound}, + {13, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, + {12, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, // get on generation 2 - {11, revision{10, 0}, revision{8, 0}, 2, nil}, - {10, revision{10, 0}, revision{8, 0}, 2, nil}, - {9, revision{8, 0}, revision{8, 0}, 1, nil}, - {8, revision{8, 0}, revision{8, 0}, 1, nil}, + {11, revision.Revision{Main: 10, Sub: 0}, revision.Revision{Main: 8, Sub: 0}, 2, nil}, + {10, revision.Revision{Main: 10, Sub: 0}, revision.Revision{Main: 8, Sub: 0}, 2, nil}, + {9, revision.Revision{Main: 8, Sub: 0}, revision.Revision{Main: 8, Sub: 0}, 1, nil}, + {8, revision.Revision{Main: 8, Sub: 0}, revision.Revision{Main: 8, Sub: 0}, 1, nil}, - {7, revision{}, revision{}, 0, ErrRevisionNotFound}, - {6, revision{}, revision{}, 0, ErrRevisionNotFound}, + {7, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, + {6, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, // get on generation 1 - {5, revision{4, 0}, revision{2, 0}, 2, nil}, - {4, revision{4, 0}, revision{2, 0}, 2, nil}, + {5, revision.Revision{Main: 4, Sub: 0}, revision.Revision{Main: 2, Sub: 0}, 2, nil}, + {4, revision.Revision{Main: 4, Sub: 0}, revision.Revision{Main: 2, Sub: 0}, 2, nil}, - {3, revision{}, revision{}, 0, ErrRevisionNotFound}, - {2, revision{}, revision{}, 0, ErrRevisionNotFound}, - {1, revision{}, revision{}, 0, ErrRevisionNotFound}, - {0, revision{}, revision{}, 0, ErrRevisionNotFound}, + {3, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, + {2, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, + {1, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, + {0, revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, } for i, tt := range tests { @@ -89,13 +90,13 @@ func TestKeyIndexGet(t *testing.T) { func TestKeyIndexSince(t *testing.T) { ki := newTestKeyIndex(zaptest.NewLogger(t)) - ki.compact(zaptest.NewLogger(t), 4, make(map[revision]struct{})) + ki.compact(zaptest.NewLogger(t), 4, make(map[revision.Revision]struct{})) - allRevs := []revision{{4, 0}, {6, 0}, {8, 0}, {10, 0}, {12, 0}, {14, 1}, {16, 0}} + allRevs := []revision.Revision{{Main: 4, Sub: 0}, {Main: 6, Sub: 0}, {Main: 8, Sub: 0}, {Main: 10, Sub: 0}, {Main: 12, Sub: 0}, {Main: 14, Sub: 1}, {Main: 16, Sub: 0}} tests := []struct { rev int64 - wrevs []revision + wrevs []revision.Revision }{ {17, nil}, {16, allRevs[6:]}, @@ -131,8 +132,8 @@ func TestKeyIndexPut(t *testing.T) { wki := &keyIndex{ key: []byte("foo"), - modified: revision{5, 0}, - generations: []generation{{created: revision{5, 0}, ver: 1, revs: []revision{{main: 5}}}}, + modified: revision.Revision{Main: 5, Sub: 0}, + generations: []generation{{created: revision.Revision{Main: 5, Sub: 0}, ver: 1, revs: []revision.Revision{{Main: 5}}}}, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) @@ -142,8 +143,8 @@ func TestKeyIndexPut(t *testing.T) { wki = &keyIndex{ key: []byte("foo"), - modified: revision{7, 0}, - generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}}}, + modified: revision.Revision{Main: 7, Sub: 0}, + generations: []generation{{created: revision.Revision{Main: 5, Sub: 0}, ver: 2, revs: []revision.Revision{{Main: 5}, {Main: 7}}}}, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) @@ -152,12 +153,12 @@ func TestKeyIndexPut(t *testing.T) { func TestKeyIndexRestore(t *testing.T) { ki := &keyIndex{key: []byte("foo")} - ki.restore(zaptest.NewLogger(t), revision{5, 0}, revision{7, 0}, 2) + ki.restore(zaptest.NewLogger(t), revision.Revision{Main: 5, Sub: 0}, revision.Revision{Main: 7, Sub: 0}, 2) wki := &keyIndex{ key: []byte("foo"), - modified: revision{7, 0}, - generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 7}}}}, + modified: revision.Revision{Main: 7, Sub: 0}, + generations: []generation{{created: revision.Revision{Main: 5, Sub: 0}, ver: 2, revs: []revision.Revision{{Main: 7}}}}, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) @@ -175,8 +176,8 @@ func TestKeyIndexTombstone(t *testing.T) { wki := &keyIndex{ key: []byte("foo"), - modified: revision{7, 0}, - generations: []generation{{created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}}, {}}, + modified: revision.Revision{Main: 7, Sub: 0}, + generations: []generation{{created: revision.Revision{Main: 5, Sub: 0}, ver: 2, revs: []revision.Revision{{Main: 5}, {Main: 7}}}, {}}, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) @@ -191,10 +192,10 @@ func TestKeyIndexTombstone(t *testing.T) { wki = &keyIndex{ key: []byte("foo"), - modified: revision{15, 0}, + modified: revision.Revision{Main: 15, Sub: 0}, generations: []generation{ - {created: revision{5, 0}, ver: 2, revs: []revision{{main: 5}, {main: 7}}}, - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 9}, {main: 15}}}, + {created: revision.Revision{Main: 5, Sub: 0}, ver: 2, revs: []revision.Revision{{Main: 5}, {Main: 7}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 9}, {Main: 15}}}, {}, }, } @@ -213,241 +214,241 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { compact int64 wki *keyIndex - wam map[revision]struct{} + wam map[revision.Revision]struct{} }{ { 1, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 2, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 2}, {Main: 4}, {Main: 6}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision.Revision]struct{}{}, }, { 2, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 2, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 2}, {Main: 4}, {Main: 6}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 2}: {}, + map[revision.Revision]struct{}{ + {Main: 2}: {}, }, }, { 3, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{2, 0}, ver: 3, revs: []revision{{main: 2}, {main: 4}, {main: 6}}}, - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 2, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 2}, {Main: 4}, {Main: 6}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 2}: {}, + map[revision.Revision]struct{}{ + {Main: 2}: {}, }, }, { 4, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}}, - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 2, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 4}, {Main: 6}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 4}: {}, + map[revision.Revision]struct{}{ + {Main: 4}: {}, }, }, { 5, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{2, 0}, ver: 3, revs: []revision{{main: 4}, {main: 6}}}, - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 2, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 4}, {Main: 6}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 4}: {}, + map[revision.Revision]struct{}{ + {Main: 4}: {}, }, }, { 6, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision.Revision]struct{}{}, }, { 7, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision.Revision]struct{}{}, }, { 8, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 8}: {}, + map[revision.Revision]struct{}{ + {Main: 8}: {}, }, }, { 9, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 8}, {Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 8}: {}, + map[revision.Revision]struct{}{ + {Main: 8}: {}, }, }, { 10, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 10}: {}, + map[revision.Revision]struct{}{ + {Main: 10}: {}, }, }, { 11, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{8, 0}, ver: 3, revs: []revision{{main: 10}, {main: 12}}}, - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 8, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 10}, {Main: 12}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 10}: {}, + map[revision.Revision]struct{}{ + {Main: 10}: {}, }, }, { 12, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision.Revision]struct{}{}, }, { 13, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14}, {Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision.Revision]struct{}{}, }, { 14, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 14, sub: 1}: {}, + map[revision.Revision]struct{}{ + {Main: 14, Sub: 1}: {}, }, }, { 15, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ - {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14, sub: 1}, {main: 16}}}, + {created: revision.Revision{Main: 14, Sub: 0}, ver: 3, revs: []revision.Revision{{Main: 14, Sub: 1}, {Main: 16}}}, {}, }, }, - map[revision]struct{}{ - {main: 14, sub: 1}: {}, + map[revision.Revision]struct{}{ + {Main: 14, Sub: 1}: {}, }, }, { 16, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision.Revision{Main: 16, Sub: 0}, generations: []generation{ {}, }, }, - map[revision]struct{}{}, + map[revision.Revision]struct{}{}, }, } // Continuous Compaction and finding Keep ki := newTestKeyIndex(zaptest.NewLogger(t)) for i, tt := range tests { - am := make(map[revision]struct{}) + am := make(map[revision.Revision]struct{}) kiclone := cloneKeyIndex(ki) ki.keep(tt.compact, am) if !reflect.DeepEqual(ki, kiclone) { @@ -456,7 +457,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { if !reflect.DeepEqual(am, tt.wam) { t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) } - am = make(map[revision]struct{}) + am = make(map[revision.Revision]struct{}) ki.compact(zaptest.NewLogger(t), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) @@ -470,7 +471,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { ki = newTestKeyIndex(zaptest.NewLogger(t)) for i, tt := range tests { if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) { - am := make(map[revision]struct{}) + am := make(map[revision.Revision]struct{}) kiclone := cloneKeyIndex(ki) ki.keep(tt.compact, am) if !reflect.DeepEqual(ki, kiclone) { @@ -479,7 +480,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { if !reflect.DeepEqual(am, tt.wam) { t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) } - am = make(map[revision]struct{}) + am = make(map[revision.Revision]struct{}) ki.compact(zaptest.NewLogger(t), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) @@ -494,7 +495,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { // Once Compaction and finding Keep for i, tt := range tests { ki := newTestKeyIndex(zaptest.NewLogger(t)) - am := make(map[revision]struct{}) + am := make(map[revision.Revision]struct{}) ki.keep(tt.compact, am) if !reflect.DeepEqual(ki, kiClone) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone) @@ -502,7 +503,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { if !reflect.DeepEqual(am, tt.wam) { t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) } - am = make(map[revision]struct{}) + am = make(map[revision.Revision]struct{}) ki.compact(zaptest.NewLogger(t), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki) @@ -525,7 +526,7 @@ func cloneGeneration(g *generation) *generation { if g.revs == nil { return &generation{g.ver, g.created, nil} } - tmp := make([]revision, len(g.revs)) + tmp := make([]revision.Revision, len(g.revs)) copy(tmp, g.revs) return &generation{g.ver, g.created, tmp} } @@ -536,18 +537,18 @@ func TestKeyIndexCompactOnFurtherRev(t *testing.T) { ki := &keyIndex{key: []byte("foo")} ki.put(zaptest.NewLogger(t), 1, 0) ki.put(zaptest.NewLogger(t), 2, 0) - am := make(map[revision]struct{}) + am := make(map[revision.Revision]struct{}) ki.compact(zaptest.NewLogger(t), 3, am) wki := &keyIndex{ key: []byte("foo"), - modified: revision{2, 0}, + modified: revision.Revision{Main: 2, Sub: 0}, generations: []generation{ - {created: revision{1, 0}, ver: 2, revs: []revision{{main: 2}}}, + {created: revision.Revision{Main: 1, Sub: 0}, ver: 2, revs: []revision.Revision{{Main: 2}}}, }, } - wam := map[revision]struct{}{ - {main: 2}: {}, + wam := map[revision.Revision]struct{}{ + {Main: 2}: {}, } if !reflect.DeepEqual(ki, wki) { t.Errorf("ki = %+v, want %+v", ki, wki) @@ -572,9 +573,9 @@ func TestKeyIndexIsEmpty(t *testing.T) { { &keyIndex{ key: []byte("foo"), - modified: revision{2, 0}, + modified: revision.Revision{Main: 2, Sub: 0}, generations: []generation{ - {created: revision{1, 0}, ver: 2, revs: []revision{{main: 2}}}, + {created: revision.Revision{Main: 1, Sub: 0}, ver: 2, revs: []revision.Revision{{Main: 2}}}, }, }, false, @@ -644,7 +645,7 @@ func TestGenerationIsEmpty(t *testing.T) { }{ {nil, true}, {&generation{}, true}, - {&generation{revs: []revision{{main: 1}}}, false}, + {&generation{revs: []revision.Revision{{Main: 1}}}, false}, } for i, tt := range tests { g := tt.g.isEmpty() @@ -657,19 +658,19 @@ func TestGenerationIsEmpty(t *testing.T) { func TestGenerationWalk(t *testing.T) { g := &generation{ ver: 3, - created: revision{2, 0}, - revs: []revision{{main: 2}, {main: 4}, {main: 6}}, + created: revision.Revision{Main: 2, Sub: 0}, + revs: []revision.Revision{{Main: 2}, {Main: 4}, {Main: 6}}, } tests := []struct { - f func(rev revision) bool + f func(rev revision.Revision) bool wi int }{ - {func(rev revision) bool { return rev.main >= 7 }, 2}, - {func(rev revision) bool { return rev.main >= 6 }, 1}, - {func(rev revision) bool { return rev.main >= 5 }, 1}, - {func(rev revision) bool { return rev.main >= 4 }, 0}, - {func(rev revision) bool { return rev.main >= 3 }, 0}, - {func(rev revision) bool { return rev.main >= 2 }, -1}, + {func(rev revision.Revision) bool { return rev.Main >= 7 }, 2}, + {func(rev revision.Revision) bool { return rev.Main >= 6 }, 1}, + {func(rev revision.Revision) bool { return rev.Main >= 5 }, 1}, + {func(rev revision.Revision) bool { return rev.Main >= 4 }, 0}, + {func(rev revision.Revision) bool { return rev.Main >= 3 }, 0}, + {func(rev revision.Revision) bool { return rev.Main >= 2 }, -1}, } for i, tt := range tests { idx := g.walk(tt.f) diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index eeb82c68bc90..70af0f90c0aa 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "go.etcd.io/etcd/pkg/v3/revision" + "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/schedule" "go.etcd.io/etcd/pkg/v3/traceutil" @@ -41,9 +43,8 @@ const ( // markedRevBytesLen is the byte length of marked revision. // The first `revBytesLen` bytes represents a normal revision. The last // one byte is the mark. - markedRevBytesLen = revBytesLen + 1 - markBytePosition = markedRevBytesLen - 1 - markTombstone byte = 't' + markBytePosition = revision.MarkedRevBytesLen - 1 + markTombstone byte = 't' ) var restoreChunkKeys = 10000 // non-const for testing @@ -320,9 +321,9 @@ func (s *store) Restore(b backend.Backend) error { func (s *store) restore() error { s.setupMetricsReporter() - min, max := newRevBytes(), newRevBytes() - revToBytes(revision{main: 1}, min) - revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) + min, max := revision.NewRevBytes(), revision.NewRevBytes() + revision.RevToBytes(revision.Revision{Main: 1}, min) + revision.RevToBytes(revision.Revision{Main: math.MaxInt64, Sub: math.MaxInt64}, max) keyToLease := make(map[string]lease.LeaseID) @@ -359,9 +360,9 @@ func (s *store) restore() error { break } // next set begins after where this one ended - newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) - newMin.sub++ - revToBytes(newMin, min) + newMin := revision.BytesToRev(keys[len(keys)-1][:revision.RevBytesLen]) + newMin.Sub++ + revision.RevToBytes(newMin, min) } close(rkvc) @@ -448,18 +449,18 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int ok = true } } - rev := bytesToRev(rkv.key) - currentRev = rev.main + rev := revision.BytesToRev(rkv.key) + currentRev = rev.Main if ok { if isTombstone(rkv.key) { - if err := ki.tombstone(lg, rev.main, rev.sub); err != nil { + if err := ki.tombstone(lg, rev.Main, rev.Sub); err != nil { lg.Warn("tombstone encountered error", zap.Error(err)) } continue } - ki.put(lg, rev.main, rev.sub) + ki.put(lg, rev.Main, rev.Sub) } else if !isTombstone(rkv.key) { - ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) + ki.restore(lg, revision.Revision{Main: rkv.kv.CreateRevision, Sub: 0}, rev, rkv.kv.Version) idx.Insert(ki) kiCache[rkv.kstr] = ki } @@ -521,10 +522,10 @@ func (s *store) setupMetricsReporter() { // appendMarkTombstone appends tombstone mark to normal revision bytes. func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { - if len(b) != revBytesLen { + if len(b) != revision.RevBytesLen { lg.Panic( "cannot append tombstone mark to non-normal revision bytes", - zap.Int("expected-revision-bytes-size", revBytesLen), + zap.Int("expected-revision-bytes-size", revision.RevBytesLen), zap.Int("given-revision-bytes-size", len(b)), ) } @@ -533,7 +534,7 @@ func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { // isTombstone checks whether the revision bytes is a tombstone. func isTombstone(b []byte) bool { - return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone + return len(b) == revision.MarkedRevBytesLen && b[markBytePosition] == markTombstone } func (s *store) HashStorage() HashStorage { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index 9a0163697a75..d959a64884ea 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/server/v3/storage/schema" ) @@ -44,7 +45,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal h := newKVHasher(prevCompactRev, compactMainRev, keep) last := make([]byte, 8+1+8) for { - var rev revision + var rev revision.Revision start := time.Now() @@ -52,7 +53,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal tx.LockOutsideApply() keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for i := range keys { - rev = bytesToRev(keys[i]) + rev = revision.BytesToRev(keys[i]) if _, ok := keep[rev]; !ok { tx.UnsafeDelete(schema.Key, keys[i]) keyCompactions++ @@ -77,7 +78,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal tx.Unlock() // update last - revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last) + revision.RevToBytes(revision.Revision{Main: rev.Main, Sub: rev.Sub + 1}, last) // Immediately commit the compaction deletes instead of letting them accumulate in the write buffer // gofail: var compactBeforeCommitBatch struct{} s.b.ForceCommit() diff --git a/server/storage/mvcc/kvstore_compaction_test.go b/server/storage/mvcc/kvstore_compaction_test.go index dd8837637aea..7e492946d31d 100644 --- a/server/storage/mvcc/kvstore_compaction_test.go +++ b/server/storage/mvcc/kvstore_compaction_test.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap/zaptest" + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" @@ -29,12 +30,12 @@ import ( ) func TestScheduleCompaction(t *testing.T) { - revs := []revision{{1, 0}, {2, 0}, {3, 0}} + revs := []revision.Revision{{Main: 1, Sub: 0}, {Main: 2, Sub: 0}, {Main: 3, Sub: 0}} tests := []struct { rev int64 - keep map[revision]struct{} - wrevs []revision + keep map[revision.Revision]struct{} + wrevs []revision.Revision }{ // compact at 1 and discard all history { @@ -51,17 +52,17 @@ func TestScheduleCompaction(t *testing.T) { // compact at 1 and keeps history one step earlier { 1, - map[revision]struct{}{ - {main: 1}: {}, + map[revision.Revision]struct{}{ + {Main: 1}: {}, }, revs, }, // compact at 1 and keeps history two steps earlier { 3, - map[revision]struct{}{ - {main: 2}: {}, - {main: 3}: {}, + map[revision.Revision]struct{}{ + {Main: 2}: {}, + {Main: 3}: {}, }, revs[1:], }, @@ -76,9 +77,9 @@ func TestScheduleCompaction(t *testing.T) { tx := s.b.BatchTx() tx.Lock() - ibytes := newRevBytes() + ibytes := revision.NewRevBytes() for _, rev := range revs { - revToBytes(rev, ibytes) + revision.RevToBytes(rev, ibytes) tx.UnsafePut(schema.Key, ibytes, []byte("bar")) } tx.Unlock() @@ -90,7 +91,7 @@ func TestScheduleCompaction(t *testing.T) { tx.Lock() for _, rev := range tt.wrevs { - revToBytes(rev, ibytes) + revision.RevToBytes(rev, ibytes) keys, _ := tx.UnsafeRange(schema.Key, ibytes, nil, 0) if len(keys) != 1 { t.Errorf("#%d: range on %v = %d, want 1", i, rev, len(keys)) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index af4c3846c330..f7e6e7be8664 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -33,6 +33,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/pkg/v3/testutil" + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/pkg/v3/schedule" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" @@ -71,22 +72,22 @@ func TestStorePut(t *testing.T) { } tests := []struct { - rev revision + rev revision.Revision r indexGetResp rr *rangeResp - wrev revision + wrev revision.Revision wkey []byte wkv mvccpb.KeyValue - wputrev revision + wputrev revision.Revision }{ { - revision{1, 0}, - indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound}, + revision.Revision{Main: 1, Sub: 0}, + indexGetResp{revision.Revision{}, revision.Revision{}, 0, ErrRevisionNotFound}, nil, - revision{2, 0}, - newTestKeyBytes(lg, revision{2, 0}, false), + revision.Revision{Main: 2, Sub: 0}, + newTestKeyBytes(lg, revision.Revision{Main: 2, Sub: 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -95,15 +96,15 @@ func TestStorePut(t *testing.T) { Version: 1, Lease: 1, }, - revision{2, 0}, + revision.Revision{Main: 2, Sub: 0}, }, { - revision{1, 1}, - indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil}, - &rangeResp{[][]byte{newTestKeyBytes(lg, revision{2, 1}, false)}, [][]byte{kvb}}, + revision.Revision{Main: 1, Sub: 1}, + indexGetResp{revision.Revision{Main: 2, Sub: 0}, revision.Revision{Main: 2, Sub: 0}, 1, nil}, + &rangeResp{[][]byte{newTestKeyBytes(lg, revision.Revision{Main: 2, Sub: 1}, false)}, [][]byte{kvb}}, - revision{2, 0}, - newTestKeyBytes(lg, revision{2, 0}, false), + revision.Revision{Main: 2, Sub: 0}, + newTestKeyBytes(lg, revision.Revision{Main: 2, Sub: 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -112,15 +113,15 @@ func TestStorePut(t *testing.T) { Version: 2, Lease: 2, }, - revision{2, 0}, + revision.Revision{Main: 2, Sub: 0}, }, { - revision{2, 0}, - indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil}, - &rangeResp{[][]byte{newTestKeyBytes(lg, revision{2, 1}, false)}, [][]byte{kvb}}, + revision.Revision{Main: 2, Sub: 0}, + indexGetResp{revision.Revision{Main: 2, Sub: 1}, revision.Revision{Main: 2, Sub: 0}, 2, nil}, + &rangeResp{[][]byte{newTestKeyBytes(lg, revision.Revision{Main: 2, Sub: 1}, false)}, [][]byte{kvb}}, - revision{3, 0}, - newTestKeyBytes(lg, revision{3, 0}, false), + revision.Revision{Main: 3, Sub: 0}, + newTestKeyBytes(lg, revision.Revision{Main: 3, Sub: 0}, false), mvccpb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -129,7 +130,7 @@ func TestStorePut(t *testing.T) { Version: 3, Lease: 3, }, - revision{3, 0}, + revision.Revision{Main: 3, Sub: 0}, }, } for i, tt := range tests { @@ -137,7 +138,7 @@ func TestStorePut(t *testing.T) { b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = tt.rev.main + s.currentRev = tt.rev.Main fi.indexGetRespc <- tt.r if tt.rr != nil { b.tx.rangeRespc <- *tt.rr @@ -164,13 +165,13 @@ func TestStorePut(t *testing.T) { t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact) } wact = []testutil.Action{ - {Name: "get", Params: []interface{}{[]byte("foo"), tt.wputrev.main}}, + {Name: "get", Params: []interface{}{[]byte("foo"), tt.wputrev.Main}}, {Name: "put", Params: []interface{}{[]byte("foo"), tt.wputrev}}, } if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } - if s.currentRev != tt.wrev.main { + if s.currentRev != tt.wrev.Main { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } @@ -180,7 +181,7 @@ func TestStorePut(t *testing.T) { func TestStoreRange(t *testing.T) { lg := zaptest.NewLogger(t) - key := newTestKeyBytes(lg, revision{2, 0}, false) + key := newTestKeyBytes(lg, revision.Revision{Main: 2, Sub: 0}, false) kv := mvccpb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -199,11 +200,11 @@ func TestStoreRange(t *testing.T) { r rangeResp }{ { - indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + indexRangeResp{[][]byte{[]byte("foo")}, []revision.Revision{{Main: 2, Sub: 0}}}, rangeResp{[][]byte{key}, [][]byte{kvb}}, }, { - indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}}, + indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision.Revision{{Main: 2, Sub: 0}, {Main: 3, Sub: 0}}}, rangeResp{[][]byte{key}, [][]byte{kvb}}, }, } @@ -229,8 +230,8 @@ func TestStoreRange(t *testing.T) { t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev) } - wstart := newRevBytes() - revToBytes(tt.idxr.revs[0], wstart) + wstart := revision.NewRevBytes() + revision.RevToBytes(tt.idxr.revs[0], wstart) wact := []testutil.Action{ {Name: "range", Params: []interface{}{schema.Key, wstart, []byte(nil), int64(0)}}, } @@ -253,7 +254,7 @@ func TestStoreRange(t *testing.T) { func TestStoreDeleteRange(t *testing.T) { lg := zaptest.NewLogger(t) - key := newTestKeyBytes(lg, revision{2, 0}, false) + key := newTestKeyBytes(lg, revision.Revision{Main: 2, Sub: 0}, false) kv := mvccpb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -267,24 +268,24 @@ func TestStoreDeleteRange(t *testing.T) { } tests := []struct { - rev revision + rev revision.Revision r indexRangeResp rr rangeResp wkey []byte - wrev revision + wrev revision.Revision wrrev int64 - wdelrev revision + wdelrev revision.Revision }{ { - revision{2, 0}, - indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}}, + revision.Revision{Main: 2, Sub: 0}, + indexRangeResp{[][]byte{[]byte("foo")}, []revision.Revision{{Main: 2, Sub: 0}}}, rangeResp{[][]byte{key}, [][]byte{kvb}}, - newTestKeyBytes(lg, revision{3, 0}, true), - revision{3, 0}, + newTestKeyBytes(lg, revision.Revision{Main: 3, Sub: 0}, true), + revision.Revision{Main: 3, Sub: 0}, 2, - revision{3, 0}, + revision.Revision{Main: 3, Sub: 0}, }, } for i, tt := range tests { @@ -292,7 +293,7 @@ func TestStoreDeleteRange(t *testing.T) { b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) - s.currentRev = tt.rev.main + s.currentRev = tt.rev.Main fi.indexRangeRespc <- tt.r b.tx.rangeRespc <- tt.rr @@ -320,7 +321,7 @@ func TestStoreDeleteRange(t *testing.T) { if g := fi.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("#%d: index action = %+v, want %+v", i, g, wact) } - if s.currentRev != tt.wrev.main { + if s.currentRev != tt.wrev.Main { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } s.Close() @@ -335,9 +336,9 @@ func TestStoreCompact(t *testing.T) { fi := s.kvindex.(*fakeIndex) s.currentRev = 3 - fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}} - key1 := newTestKeyBytes(lg, revision{1, 0}, false) - key2 := newTestKeyBytes(lg, revision{2, 0}, false) + fi.indexCompactRespc <- map[revision.Revision]struct{}{{Main: 1, Sub: 0}: {}} + key1 := newTestKeyBytes(lg, revision.Revision{Main: 1, Sub: 0}, false) + key2 := newTestKeyBytes(lg, revision.Revision{Main: 2, Sub: 0}, false) b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}} b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}} @@ -353,10 +354,10 @@ func TestStoreCompact(t *testing.T) { wact := []testutil.Action{ {Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}}, {Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}}, - {Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}}, + {Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision.Revision{Main: 3, Sub: 0})}}, {Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}}, {Name: "delete", Params: []interface{}{schema.Key, key2}}, - {Name: "put", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, newTestRevBytes(revision{3, 0})}}, + {Name: "put", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, newTestRevBytes(revision.Revision{Main: 3, Sub: 0})}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) @@ -376,7 +377,7 @@ func TestStoreRestore(t *testing.T) { fi := s.kvindex.(*fakeIndex) defer s.Close() - putkey := newTestKeyBytes(lg, revision{3, 0}, false) + putkey := newTestKeyBytes(lg, revision.Revision{Main: 3, Sub: 0}, false) putkv := mvccpb.KeyValue{ Key: []byte("foo"), Value: []byte("bar"), @@ -388,7 +389,7 @@ func TestStoreRestore(t *testing.T) { if err != nil { t.Fatal(err) } - delkey := newTestKeyBytes(lg, revision{5, 0}, true) + delkey := newTestKeyBytes(lg, revision.Revision{Main: 5, Sub: 0}, true) delkv := mvccpb.KeyValue{ Key: []byte("foo"), } @@ -396,8 +397,8 @@ func TestStoreRestore(t *testing.T) { if err != nil { t.Fatal(err) } - b.tx.rangeRespc <- rangeResp{[][]byte{schema.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} - b.tx.rangeRespc <- rangeResp{[][]byte{schema.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{schema.FinishedCompactKeyName}, [][]byte{newTestRevBytes(revision.Revision{Main: 3, Sub: 0})}} + b.tx.rangeRespc <- rangeResp{[][]byte{schema.ScheduledCompactKeyName}, [][]byte{newTestRevBytes(revision.Revision{Main: 3, Sub: 0})}} b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}} b.tx.rangeRespc <- rangeResp{nil, nil} @@ -413,17 +414,17 @@ func TestStoreRestore(t *testing.T) { wact := []testutil.Action{ {Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []byte(nil), int64(0)}}, {Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []byte(nil), int64(0)}}, - {Name: "range", Params: []interface{}{schema.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}}, + {Name: "range", Params: []interface{}{schema.Key, newTestRevBytes(revision.Revision{Main: 1, Sub: 0}), newTestRevBytes(revision.Revision{Main: math.MaxInt64, Sub: math.MaxInt64}), int64(restoreChunkKeys)}}, } if g := b.tx.Action(); !reflect.DeepEqual(g, wact) { t.Errorf("tx actions = %+v, want %+v", g, wact) } gens := []generation{ - {created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}}, - {created: revision{0, 0}, ver: 0, revs: nil}, + {created: revision.Revision{Main: 4, Sub: 0}, ver: 2, revs: []revision.Revision{{Main: 3, Sub: 0}, {Main: 5, Sub: 0}}}, + {created: revision.Revision{Main: 0, Sub: 0}, ver: 0, revs: nil}, } - ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens} + ki := &keyIndex{key: []byte("foo"), modified: revision.Revision{Main: 5, Sub: 0}, generations: gens} wact = []testutil.Action{ {Name: "keyIndex", Params: []interface{}{ki}}, {Name: "insert", Params: []interface{}{ki}}, @@ -497,8 +498,8 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease) // write scheduled compaction, but not do compaction - rbytes := newRevBytes() - revToBytes(revision{main: 2}, rbytes) + rbytes := revision.NewRevBytes() + revision.RevToBytes(revision.Revision{Main: 2}, rbytes) tx := s0.b.BatchTx() tx.Lock() UnsafeSetScheduledCompact(tx, 2) @@ -525,8 +526,8 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) } // check the key in backend is deleted - revbytes := newRevBytes() - revToBytes(revision{main: 1}, revbytes) + revbytes := revision.NewRevBytes() + revision.RevToBytes(revision.Revision{Main: 1}, revbytes) // The disk compaction is done asynchronously and requires more time on slow disk. // try 5 times for CI with slow IO. @@ -541,7 +542,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { } return } - t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) + t.Errorf("key for rev %+v still exists, want deleted", revision.BytesToRev(revbytes)) }) } } @@ -884,15 +885,15 @@ func merge(dst, src kvs) kvs { // TODO: test attach key to lessor -func newTestRevBytes(rev revision) []byte { - bytes := newRevBytes() - revToBytes(rev, bytes) +func newTestRevBytes(rev revision.Revision) []byte { + bytes := revision.NewRevBytes() + revision.RevToBytes(rev, bytes) return bytes } -func newTestKeyBytes(lg *zap.Logger, rev revision, tombstone bool) []byte { - bytes := newRevBytes() - revToBytes(rev, bytes) +func newTestKeyBytes(lg *zap.Logger, rev revision.Revision, tombstone bool) []byte { + bytes := revision.NewRevBytes() + revision.RevToBytes(rev, bytes) if tombstone { bytes = appendMarkTombstone(lg, bytes) } @@ -928,7 +929,7 @@ func newFakeIndex() *fakeIndex { indexGetRespc: make(chan indexGetResp, 1), indexRangeRespc: make(chan indexRangeResp, 1), indexRangeEventsRespc: make(chan indexRangeEventsResp, 1), - indexCompactRespc: make(chan map[revision]struct{}, 1), + indexCompactRespc: make(chan map[revision.Revision]struct{}, 1), } } @@ -988,19 +989,19 @@ func (b *fakeBackend) Close() error func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} type indexGetResp struct { - rev revision - created revision + rev revision.Revision + created revision.Revision ver int64 err error } type indexRangeResp struct { keys [][]byte - revs []revision + revs []revision.Revision } type indexRangeEventsResp struct { - revs []revision + revs []revision.Revision } type fakeIndex struct { @@ -1008,10 +1009,10 @@ type fakeIndex struct { indexGetRespc chan indexGetResp indexRangeRespc chan indexRangeResp indexRangeEventsRespc chan indexRangeEventsResp - indexCompactRespc chan map[revision]struct{} + indexCompactRespc chan map[revision.Revision]struct{} } -func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) { +func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]revision.Revision, int) { _, rev := i.Range(key, end, atRev) if len(rev) >= limit { rev = rev[:limit] @@ -1024,33 +1025,33 @@ func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int { return len(rev) } -func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) { +func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision.Revision, ver int64, err error) { i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}}) r := <-i.indexGetRespc return r.rev, r.created, r.ver, r.err } -func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) { +func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision.Revision) { i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}}) r := <-i.indexRangeRespc return r.keys, r.revs } -func (i *fakeIndex) Put(key []byte, rev revision) { +func (i *fakeIndex) Put(key []byte, rev revision.Revision) { i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}}) } -func (i *fakeIndex) Tombstone(key []byte, rev revision) error { +func (i *fakeIndex) Tombstone(key []byte, rev revision.Revision) error { i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}}) return nil } -func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision { +func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision.Revision { i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}}) r := <-i.indexRangeEventsRespc return r.revs } -func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { +func (i *fakeIndex) Compact(rev int64) map[revision.Revision]struct{} { i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}}) return <-i.indexCompactRespc } -func (i *fakeIndex) Keep(rev int64) map[revision]struct{} { +func (i *fakeIndex) Keep(rev int64) map[revision.Revision]struct{} { i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}}) return <-i.indexCompactRespc } diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index b93fcbe64da1..66b67e3a7cec 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/backend" @@ -92,20 +93,20 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i } kvs := make([]mvccpb.KeyValue, limit) - revBytes := newRevBytes() + revBytes := revision.NewRevBytes() for i, revpair := range revpairs[:len(kvs)] { select { case <-ctx.Done(): return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err()) default: } - revToBytes(revpair, revBytes) + revision.RevToBytes(revpair, revBytes) _, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0) if len(vs) != 1 { tr.s.lg.Fatal( "range failed to find revision pair", - zap.Int64("revision-main", revpair.main), - zap.Int64("revision-sub", revpair.sub), + zap.Int64("revision-main", revpair.Main), + zap.Int64("revision-sub", revpair.Sub), zap.Int64("revision-current", curRev), zap.Int64("range-option-rev", ro.Rev), zap.Int64("range-option-limit", ro.Limit), @@ -197,13 +198,13 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { // get its previous leaseID _, created, ver, err := tw.s.kvindex.Get(key, rev) if err == nil { - c = created.main + c = created.Main oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) tw.trace.Step("get key's previous created_revision and leaseID") } - ibytes := newRevBytes() - idxRev := revision{main: rev, sub: int64(len(tw.changes))} - revToBytes(idxRev, ibytes) + ibytes := revision.NewRevBytes() + idxRev := revision.Revision{Main: rev, Sub: int64(len(tw.changes))} + revision.RevToBytes(idxRev, ibytes) ver = ver + 1 kv := mvccpb.KeyValue{ @@ -274,9 +275,9 @@ func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 { } func (tw *storeTxnWrite) delete(key []byte) { - ibytes := newRevBytes() - idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} - revToBytes(idxRev, ibytes) + ibytes := revision.NewRevBytes() + idxRev := revision.Revision{Main: tw.beginRev + 1, Sub: int64(len(tw.changes))} + revision.RevToBytes(idxRev, ibytes) ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) diff --git a/server/storage/mvcc/revision.go b/server/storage/mvcc/revision.go deleted file mode 100644 index a910e177aef9..000000000000 --- a/server/storage/mvcc/revision.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mvcc - -import "encoding/binary" - -// revBytesLen is the byte length of a normal revision. -// First 8 bytes is the revision.main in big-endian format. The 9th byte -// is a '_'. The last 8 bytes is the revision.sub in big-endian format. -const revBytesLen = 8 + 1 + 8 - -// A revision indicates modification of the key-value space. -// The set of changes that share same main revision changes the key-value space atomically. -type revision struct { - // main is the main revision of a set of changes that happen atomically. - main int64 - - // sub is the sub revision of a change in a set of changes that happen - // atomically. Each change has different increasing sub revision in that - // set. - sub int64 -} - -func (a revision) GreaterThan(b revision) bool { - if a.main > b.main { - return true - } - if a.main < b.main { - return false - } - return a.sub > b.sub -} - -func newRevBytes() []byte { - return make([]byte, revBytesLen, markedRevBytesLen) -} - -func revToBytes(rev revision, bytes []byte) { - binary.BigEndian.PutUint64(bytes, uint64(rev.main)) - bytes[8] = '_' - binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub)) -} - -func bytesToRev(bytes []byte) revision { - return revision{ - main: int64(binary.BigEndian.Uint64(bytes[0:8])), - sub: int64(binary.BigEndian.Uint64(bytes[9:])), - } -} diff --git a/server/storage/mvcc/store.go b/server/storage/mvcc/store.go index a002ada71777..018b9a34447c 100644 --- a/server/storage/mvcc/store.go +++ b/server/storage/mvcc/store.go @@ -15,6 +15,7 @@ package mvcc import ( + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" ) @@ -22,7 +23,7 @@ import ( func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) { _, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0) if len(finishedCompactBytes) != 0 { - return bytesToRev(finishedCompactBytes[0]).main, true + return revision.BytesToRev(finishedCompactBytes[0]).Main, true } return 0, false } @@ -30,7 +31,7 @@ func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found b func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) { _, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0) if len(scheduledCompactBytes) != 0 { - return bytesToRev(scheduledCompactBytes[0]).main, true + return revision.BytesToRev(scheduledCompactBytes[0]).Main, true } return 0, false } @@ -42,8 +43,8 @@ func SetScheduledCompact(tx backend.BatchTx, value int64) { } func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) { - rbytes := newRevBytes() - revToBytes(revision{main: value}, rbytes) + rbytes := revision.NewRevBytes() + revision.RevToBytes(revision.Revision{Main: value}, rbytes) tx.UnsafePut(schema.Meta, schema.ScheduledCompactKeyName, rbytes) } @@ -54,7 +55,7 @@ func SetFinishedCompact(tx backend.BatchTx, value int64) { } func UnsafeSetFinishedCompact(tx backend.BatchTx, value int64) { - rbytes := newRevBytes() - revToBytes(revision{main: value}, rbytes) + rbytes := revision.NewRevBytes() + revision.RevToBytes(revision.Revision{Main: value}, rbytes) tx.UnsafePut(schema.Meta, schema.FinishedCompactKeyName, rbytes) } diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index 4e7b5a71407c..ee6f6418b237 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -20,6 +20,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/lease" "go.etcd.io/etcd/server/v3/storage/backend" @@ -352,9 +353,9 @@ func (s *watchableStore) syncWatchers() int { compactionRev := s.store.compactMainRev wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) - minBytes, maxBytes := newRevBytes(), newRevBytes() - revToBytes(revision{main: minRev}, minBytes) - revToBytes(revision{main: curRev + 1}, maxBytes) + minBytes, maxBytes := revision.NewRevBytes(), revision.NewRevBytes() + revision.RevToBytes(revision.Revision{Main: minRev}, minBytes) + revision.RevToBytes(revision.Revision{Main: curRev + 1}, maxBytes) // UnsafeRange returns keys and values. And in boltdb, keys are revisions. // values are actual key-value pairs in backend. @@ -428,7 +429,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m if isTombstone(revs[i]) { ty = mvccpb.DELETE // patch in mod revision so watchers won't skip - kv.ModRevision = bytesToRev(revs[i]).main + kv.ModRevision = revision.BytesToRev(revs[i]).Main } evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty}) } diff --git a/tools/etcd-dump-db/backend.go b/tools/etcd-dump-db/backend.go index 5c9df5e42e05..a9df37c1302f 100644 --- a/tools/etcd-dump-db/backend.go +++ b/tools/etcd-dump-db/backend.go @@ -24,6 +24,7 @@ import ( bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/api/v3/authpb" "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/pkg/v3/revision" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" @@ -63,24 +64,12 @@ var decoders = map[string]decoder{ "meta": metaDecoder, } -type revision struct { - main int64 - sub int64 -} - -func bytesToRev(bytes []byte) revision { - return revision{ - main: int64(binary.BigEndian.Uint64(bytes[0:8])), - sub: int64(binary.BigEndian.Uint64(bytes[9:])), - } -} - func defaultDecoder(k, v []byte) { fmt.Printf("key=%q, value=%q\n", k, v) } func keyDecoder(k, v []byte) { - rev := bytesToRev(k) + rev := revision.BytesToRev(k) var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { panic(err) @@ -135,7 +124,7 @@ func metaDecoder(k, v []byte) { if string(k) == string(schema.MetaConsistentIndexKeyName) || string(k) == string(schema.MetaTermKeyName) { fmt.Printf("key=%q, value=%v\n", k, binary.BigEndian.Uint64(v)) } else if string(k) == string(schema.ScheduledCompactKeyName) || string(k) == string(schema.FinishedCompactKeyName) { - rev := bytesToRev(v) + rev := revision.BytesToRev(v) fmt.Printf("key=%q, value=%v\n", k, rev) } else { defaultDecoder(k, v)