Skip to content

Commit

Permalink
fix initKeysAndMutations
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Jan 23, 2025
1 parent c46d490 commit 4c16a14
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
23 changes: 19 additions & 4 deletions internal/unionstore/art/art_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"fmt"
"sort"

"github.com/tikv/client-go/v2/internal/logutil"
"go.uber.org/zap"

"github.com/pkg/errors"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
"github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -84,7 +87,21 @@ type Iterator struct {
ignoreSeqNo bool
}

func (it *Iterator) Valid() bool { return it.valid && (it.seqNo == it.tree.SeqNo || it.ignoreSeqNo) }
func (it *Iterator) checkSeqNo() {
if it.seqNo != it.tree.SeqNo && !it.ignoreSeqNo {
logutil.BgLogger().Panic(
"seqNo mismatch",
zap.Int("it seqNo", it.seqNo),
zap.Int("art seqNo", it.tree.SeqNo),
zap.Stack("stack"),
)
}
}

func (it *Iterator) Valid() bool {
it.checkSeqNo()
return it.valid
}
func (it *Iterator) Key() []byte { return it.currLeaf.GetKey() }
func (it *Iterator) Flags() kv.KeyFlags { return it.currLeaf.GetKeyFlags() }
func (it *Iterator) Value() []byte {
Expand All @@ -108,9 +125,7 @@ func (it *Iterator) Next() error {
// iterate is finished
return errors.New("Art: iterator is finished")
}
if !it.ignoreSeqNo && it.seqNo != it.tree.SeqNo {
return errors.New(fmt.Sprintf("seqNo mismatch: iter=%d, art=%d", it.seqNo, it.tree.SeqNo))
}
it.checkSeqNo()
if it.currAddr == it.endAddr {
it.valid = false
return nil
Expand Down
7 changes: 6 additions & 1 deletion txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {

var err error
var assertionError error
toUpdatePrewriteOnly := make([][]byte, 0)
for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() {
_ = err
key := it.Key()
Expand Down Expand Up @@ -607,7 +608,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
// due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase.
op = kvrpcpb.Op_CheckNotExists
checkCnt++
memBuf.UpdateFlags(key, kv.SetPrewriteOnly)
toUpdatePrewriteOnly = append(toUpdatePrewriteOnly, key)
} else {
if flags.HasNewlyInserted() {
// The delete-your-write keys in pessimistic transactions, only lock needed keys and skip
Expand Down Expand Up @@ -682,6 +683,10 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
}
}

for _, key := range toUpdatePrewriteOnly {
memBuf.UpdateFlags(key, kv.SetPrewriteOnly)
}

if c.mutations.Len() == 0 {
return nil
}
Expand Down

0 comments on commit 4c16a14

Please sign in to comment.