Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: expose etcd's transaction as LowLevelTxn interface to kv.Base #9016

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ error = '''
internal etcd transaction error occurred
'''

["PD:etcd:ErrEtcdTxnResponse"]
error = '''
etcd transaction returned invalid response: %v
'''

["PD:etcd:ErrEtcdURLMap"]
error = '''
etcd url map error
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ var (
ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease"))
ErrEtcdTxnInternal = errors.Normalize("internal etcd transaction error occurred", errors.RFCCodeText("PD:etcd:ErrEtcdTxnInternal"))
ErrEtcdTxnConflict = errors.Normalize("etcd transaction failed, conflicted and rolled back", errors.RFCCodeText("PD:etcd:ErrEtcdTxnConflict"))
ErrEtcdTxnResponse = errors.Normalize("etcd transaction returned invalid response: %v", errors.RFCCodeText("PD:etcd:ErrEtcdTxnResponse"))
ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut"))
ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete"))
ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet"))
Expand Down
130 changes: 130 additions & 0 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"
"path"
"strings"
"time"
Expand Down Expand Up @@ -139,6 +140,14 @@ func (kv *etcdKVBase) Remove(key string) error {
return nil
}

// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern.
func (kv *etcdKVBase) CreateLowLevelTxn() LowLevelTxn {
return &lowLevelTxnWrapper{
inner: NewSlowLogTxn(kv.client),
rootPath: kv.rootPath,
}
}

// SlowLogTxn wraps etcd transaction and log slow one.
type SlowLogTxn struct {
clientv3.Txn
Expand Down Expand Up @@ -296,3 +305,124 @@ func (txn *etcdTxn) commit() error {
}
return nil
}

type lowLevelTxnWrapper struct {
inner clientv3.Txn
rootPath string
}

// If implements LowLevelTxn interface for adding conditions to the transaction.
func (l *lowLevelTxnWrapper) If(conditions ...LowLevelTxnCondition) LowLevelTxn {
cmpList := make([]clientv3.Cmp, 0, len(conditions))
for _, c := range conditions {
key := strings.Join([]string{l.rootPath, c.Key}, "/")
if c.CmpType == LowLevelCmpExists {
cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0))
} else if c.CmpType == LowLevelCmpNotExists {
cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
} else {
var cmpOp string
switch c.CmpType {
case LowLevelCmpEqual:
cmpOp = "="
case LowLevelCmpNotEqual:
cmpOp = "!="
case LowLevelCmpGreater:
cmpOp = ">"
case LowLevelCmpLess:
cmpOp = "<"
default:
panic(fmt.Sprintf("unknown cmp type %v", c.CmpType))
}
cmpList = append(cmpList, clientv3.Compare(clientv3.Value(key), cmpOp, c.Value))
}
}
l.inner = l.inner.If(cmpList...)
return l
}

func (l *lowLevelTxnWrapper) convertOps(ops []LowLevelTxnOp) []clientv3.Op {
opsList := make([]clientv3.Op, 0, len(ops))
for _, op := range ops {
key := strings.Join([]string{l.rootPath, op.Key}, "/")
switch op.OpType {
case LowLevelOpPut:
opsList = append(opsList, clientv3.OpPut(key, op.Value))
case LowLevelOpDelete:
opsList = append(opsList, clientv3.OpDelete(key))
case LowLevelOpGet:
opsList = append(opsList, clientv3.OpGet(key))
case LowLevelOpGetRange:
if op.EndKey == "\x00" {
opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit))))
} else {
endKey := strings.Join([]string{l.rootPath, op.EndKey}, "/")
opsList = append(opsList, clientv3.OpGet(key, clientv3.WithRange(endKey), clientv3.WithLimit(int64(op.Limit))))
}
default:
panic(fmt.Sprintf("unknown op type %v", op.OpType))
}
}
return opsList
}

// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to
// the transaction.
func (l *lowLevelTxnWrapper) Then(ops ...LowLevelTxnOp) LowLevelTxn {
l.inner = l.inner.Then(l.convertOps(ops)...)
return l
}

// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass
// to the transaction.
func (l *lowLevelTxnWrapper) Else(ops ...LowLevelTxnOp) LowLevelTxn {
l.inner = l.inner.Else(l.convertOps(ops)...)
return l
}

// Commit implements LowLevelTxn interface for committing the transaction.
func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, error) {
resp, err := l.inner.Commit()
if err != nil {
return LowLevelTxnResult{}, err
}
items := make([]LowLevelTxnResultItem, 0, len(resp.Responses))
for i, respItem := range resp.Responses {
var resultItem LowLevelTxnResultItem
if put := respItem.GetResponsePut(); put != nil {
// Put and delete operations of etcd's transaction won't return any previous data. Skip handling it.
resultItem = LowLevelTxnResultItem{}
if put.PrevKv != nil {
key := strings.TrimPrefix(strings.TrimPrefix(string(put.PrevKv.Key), l.rootPath), "/")
resultItem.KeyValuePairs = []KeyValuePair{{
Key: key,
Value: string(put.PrevKv.Value),
}}
}
} else if del := respItem.GetResponseDeleteRange(); del != nil {
// Put and delete operations of etcd's transaction won't return any previous data. Skip handling it.
resultItem = LowLevelTxnResultItem{}
} else if rangeResp := respItem.GetResponseRange(); rangeResp != nil {
kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs))
for _, kv := range rangeResp.Kvs {
key := strings.TrimPrefix(strings.TrimPrefix(string(kv.Key), l.rootPath), "/")
kvs = append(kvs, KeyValuePair{
Key: key,
Value: string(kv.Value),
})
}
resultItem = LowLevelTxnResultItem{
KeyValuePairs: kvs,
}
} else {
return LowLevelTxnResult{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs(
fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, respItem),
)
}
items = append(items, resultItem)
}
return LowLevelTxnResult{
Succeeded: resp.Succeeded,
ResultItems: items,
}, nil
}
151 changes: 147 additions & 4 deletions pkg/storage/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,160 @@ package kv

import "context"

// Txn bundles multiple operations into a single executable unit.
// It enables kv to atomically apply a set of updates.
type Txn interface {
// LowLevelTxnCmpType represents the comparison type that is used in the condition of LowLevelTxn.
type LowLevelTxnCmpType int

// LowLevelTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else`
// branch) of LowLevelTxn.
type LowLevelTxnOpType int

// nolint:revive
const (
LowLevelCmpEqual LowLevelTxnCmpType = iota
LowLevelCmpNotEqual
LowLevelCmpLess
LowLevelCmpGreater
LowLevelCmpExists
LowLevelCmpNotExists
)

// nolint:revive
const (
LowLevelOpPut LowLevelTxnOpType = iota
LowLevelOpDelete
LowLevelOpGet
LowLevelOpGetRange
)

// LowLevelTxnCondition represents a condition in a LowLevelTxn.
type LowLevelTxnCondition struct {
Key string
CmpType LowLevelTxnCmpType
// The value to compare with. It's not used when CmpType is LowLevelCmpExists or LowLevelCmpNotExists.
Value string
}

// CheckOnValue checks whether the condition is satisfied on the given value.
func (c *LowLevelTxnCondition) CheckOnValue(value string, exists bool) bool {
switch c.CmpType {
case LowLevelCmpEqual:
if exists && value == c.Value {
return true
}
case LowLevelCmpNotEqual:
if exists && value != c.Value {
return true
}
case LowLevelCmpLess:
if exists && value < c.Value {
return true
}
case LowLevelCmpGreater:
if exists && value > c.Value {
return true
}
case LowLevelCmpExists:
if exists {
return true
}
case LowLevelCmpNotExists:
if !exists {
return true
}
default:
panic("unreachable")
}
return false
}

// LowLevelTxnOp represents an operation in a LowLevelTxn's `Then` or `Else` branch and will be executed according to
// the result of checking conditions.
type LowLevelTxnOp struct {
Key string
OpType LowLevelTxnOpType
Value string
// The end key when the OpType is LowLevelOpGetRange.
EndKey string
// The limit of the keys to get when the OpType is LowLevelOpGetRange.
Limit int
}

// KeyValuePair represents a pair of key and value.
type KeyValuePair struct {
Key string
Value string
}

// LowLevelTxnResultItem represents a single result of a read operation in a LowLevelTxn.
type LowLevelTxnResultItem struct {
KeyValuePairs []KeyValuePair
}

// LowLevelTxnResult represents the result of a LowLevelTxn. The results of operations in `Then` or `Else` branches
// will be listed in `ResultItems` in the same order as the operations are added.
// For Put or Delete operations, its corresponding result is the previous value before writing.
type LowLevelTxnResult struct {
Succeeded bool
// The results of each operation in the `Then` branch or the `Else` branch of a transaction, depending on
// whether `Succeeded`. The i-th result belongs to the i-th operation added to the executed branch.
// * For Put or Delete operations, the result is empty.
// * For Get operations, the result contains a key-value pair representing the get result. In case the key
// does not exist, its `KeyValuePairs` field will be empty.
// * For GetRange operations, the result is a list of key-value pairs containing key-value paris that are scanned.
ResultItems []LowLevelTxnResultItem
}

// LowLevelTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction
// API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the
// behavior is simulated.
// Considering that in different backends, the kv pairs may not have equivalent property of etcd's
// version, create-time, etc., the abstracted LowLevelTxn interface does not support comparing on them.
// It only supports checking the value or whether the key exists.
// Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior
// would be undefined.
type LowLevelTxn interface {
If(conditions ...LowLevelTxnCondition) LowLevelTxn
Then(ops ...LowLevelTxnOp) LowLevelTxn
Else(ops ...LowLevelTxnOp) LowLevelTxn
Commit(ctx context.Context) (LowLevelTxnResult, error)
}

// BaseReadWrite is the API set, shared by Base and Txn interfaces, that provides basic KV read and write operations.
type BaseReadWrite interface {
Save(key, value string) error
Remove(key string) error
Load(key string) (string, error)
LoadRange(key, endKey string, limit int) (keys []string, values []string, err error)
}

// Txn bundles multiple operations into a single executable unit.
// It enables kv to atomically apply a set of updates.
type Txn interface {
BaseReadWrite
}

// Base is an abstract interface for load/save pd cluster data.
type Base interface {
Txn
BaseReadWrite
// RunInTxn runs the user provided function in a Transaction.
// If user provided function f returns a non-nil error, then
// transaction will not be committed, the same error will be
// returned by RunInTxn.
// Otherwise, it returns the error occurred during the
// transaction.
//
// This is a highly-simplified transaction interface. As
// etcd's transaction API is quite limited, it's hard to use it
// to provide a complete transaction model as how a normal database
// does. So when this API is running on etcd backend, each read on
// `txn` implicitly constructs a condition.
// (ref: https://etcd.io/docs/v3.5/learning/api/#transaction)
// When reading a range using `LoadRange`, for each key found in the
// range there will be a condition constructed. Be aware of the
// possibility of causing phantom read.
// RunInTxn may not suit all use cases. When RunInTxn is found not
// improper to use, consider using CreateLowLevelTxn instead.
//
// Note that transaction are not committed until RunInTxn returns nil.
// Note:
// 1. Load and LoadRange operations provides only stale read.
Expand All @@ -42,4 +178,11 @@ type Base interface {
// 2. Only when storage is etcd, does RunInTxn checks that
// values loaded during transaction has not been modified before commit.
RunInTxn(ctx context.Context, f func(txn Txn) error) error

// CreateLowLevelTxn creates a transaction that provides the if-then-else
// API pattern which is the same as how etcd does, makes it possible
// to precisely control how etcd's transaction API is used when the
// backend is etcd. When there's other backend types, the behavior will be
// simulated.
CreateLowLevelTxn() LowLevelTxn
}
Loading
Loading