Skip to content

Commit

Permalink
memdb: prevent iterator invalidation
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Jan 23, 2025
1 parent 61e09c6 commit 1a6d100
Show file tree
Hide file tree
Showing 18 changed files with 600 additions and 22 deletions.
4 changes: 2 additions & 2 deletions examples/gcworker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/rawkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/1pc_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/async_commit/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/delete_range/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/pessimistic_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/unsafedestoryrange/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
16 changes: 16 additions & 0 deletions internal/unionstore/arena/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import (
"encoding/binary"
"math"

"github.com/tikv/client-go/v2/internal/logutil"
"go.uber.org/zap"

"github.com/tikv/client-go/v2/kv"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -223,6 +226,19 @@ func (cp *MemDBCheckpoint) IsSamePosition(other *MemDBCheckpoint) bool {
return cp.blocks == other.blocks && cp.offsetInBlock == other.offsetInBlock
}

func (cp *MemDBCheckpoint) LessThan(cp2 *MemDBCheckpoint) bool {
if cp == nil || cp2 == nil {
logutil.BgLogger().Panic("unexpected nil checkpoint", zap.Any("cp", cp), zap.Any("cp2", cp2))
}
if cp.blocks < cp2.blocks {
return true
}
if cp.blocks == cp2.blocks && cp.offsetInBlock < cp2.offsetInBlock {
return true
}
return false
}

func (a *MemdbArena) Checkpoint() MemDBCheckpoint {
snap := MemDBCheckpoint{
blockSize: a.blockSize,
Expand Down
21 changes: 21 additions & 0 deletions internal/unionstore/art/art.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ type ART struct {
lastTraversedNode atomic.Uint64
hitCount atomic.Uint64
missCount atomic.Uint64

// The counter of every write operation, used to invalidate iterators that were created before the write operation.
SeqNo int
// increased by 1 when an operation that may affect the content returned by "snapshot iter" (i.e. stage[0]) happens.
// It's used to invalidate snapshot iterators.
// invariant: no concurrent access to it
SnapshotSeqNo int
}

func New() *ART {
Expand Down Expand Up @@ -115,6 +122,7 @@ func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
}
}

t.SeqNo++
if len(t.stages) == 0 {
t.dirty = true
}
Expand Down Expand Up @@ -479,6 +487,10 @@ func (t *ART) RevertToCheckpoint(cp *arena.MemDBCheckpoint) {
t.allocator.vlogAllocator.RevertToCheckpoint(t, cp)
t.allocator.vlogAllocator.Truncate(cp)
t.allocator.vlogAllocator.OnMemChange()
t.SeqNo++
if len(t.stages) == 0 || t.stages[0].LessThan(cp) {
t.SnapshotSeqNo++
}
}

func (t *ART) Stages() []arena.MemDBCheckpoint {
Expand All @@ -498,7 +510,9 @@ func (t *ART) Release(h int) {
if h != len(t.stages) {
panic("cannot release staging buffer")
}
t.SeqNo++
if h == 1 {
t.SnapshotSeqNo++
tail := t.checkpoint()
if !t.stages[0].IsSamePosition(&tail) {
t.dirty = true
Expand All @@ -519,6 +533,11 @@ func (t *ART) Cleanup(h int) {
panic(fmt.Sprintf("cannot cleanup staging buffer, h=%v, len(tree.stages)=%v", h, len(t.stages)))
}

t.SeqNo++
if h == 1 {
t.SnapshotSeqNo++
}

cp := &t.stages[h-1]
if !t.vlogInvalid {
curr := t.checkpoint()
Expand All @@ -542,6 +561,8 @@ func (t *ART) Reset() {
t.allocator.nodeAllocator.Reset()
t.allocator.vlogAllocator.Reset()
t.lastTraversedNode.Store(arena.NullU64Addr)
t.SnapshotSeqNo++
t.SeqNo++
}

// DiscardValues releases the memory used by all values.
Expand Down
9 changes: 8 additions & 1 deletion internal/unionstore/art/art_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (t *ART) iter(lowerBound, upperBound []byte, reverse, includeFlags bool) (*
// this avoids the initial value of currAddr equals to endAddr.
currAddr: arena.BadAddr,
endAddr: arena.NullAddr,
seqNo: t.SeqNo,
}
it.init(lowerBound, upperBound)
if !it.valid {
Expand All @@ -76,9 +77,12 @@ type Iterator struct {
currLeaf *artLeaf
currAddr arena.MemdbArenaAddr
endAddr arena.MemdbArenaAddr

// only when seqNo == art.seqNo, the iterator is valid.
seqNo int
}

func (it *Iterator) Valid() bool { return it.valid }
func (it *Iterator) Valid() bool { return it.valid && it.seqNo == it.tree.SeqNo }
func (it *Iterator) Key() []byte { return it.currLeaf.GetKey() }
func (it *Iterator) Flags() kv.KeyFlags { return it.currLeaf.GetKeyFlags() }
func (it *Iterator) Value() []byte {
Expand All @@ -102,6 +106,9 @@ func (it *Iterator) Next() error {
// iterate is finished
return errors.New("Art: iterator is finished")
}
if it.seqNo != it.tree.SeqNo {
return errors.New(fmt.Sprintf("seqNo mismatch: iter=%d, art=%d", it.seqNo, it.tree.SeqNo))
}
if it.currAddr == it.endAddr {
it.valid = false
return nil
Expand Down
Loading

0 comments on commit 1a6d100

Please sign in to comment.