diff --git a/errors.toml b/errors.toml index 9980a98ab14..303ef31ea16 100644 --- a/errors.toml +++ b/errors.toml @@ -421,6 +421,11 @@ error = ''' internal etcd transaction error occurred ''' +["PD:etcd:ErrEtcdTxnResponse"] +error = ''' +etcd transaction returned invalid response: %v +''' + ["PD:etcd:ErrEtcdURLMap"] error = ''' etcd url map error diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 834bf4f824e..5692f6cf037 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -390,6 +390,7 @@ var ( ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease")) ErrEtcdTxnInternal = errors.Normalize("internal etcd transaction error occurred", errors.RFCCodeText("PD:etcd:ErrEtcdTxnInternal")) ErrEtcdTxnConflict = errors.Normalize("etcd transaction failed, conflicted and rolled back", errors.RFCCodeText("PD:etcd:ErrEtcdTxnConflict")) + ErrEtcdTxnResponse = errors.Normalize("etcd transaction returned invalid response: %v", errors.RFCCodeText("PD:etcd:ErrEtcdTxnResponse")) ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut")) ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete")) ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet")) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index c25f4d66060..59ac1f0d7f7 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -16,6 +16,7 @@ package kv import ( "context" + "fmt" "path" "strings" "time" @@ -139,6 +140,14 @@ func (kv *etcdKVBase) Remove(key string) error { return nil } +// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. +func (kv *etcdKVBase) CreateLowLevelTxn() LowLevelTxn { + return &lowLevelTxnWrapper{ + inner: NewSlowLogTxn(kv.client), + rootPath: kv.rootPath, + } +} + // SlowLogTxn wraps etcd transaction and log slow one. type SlowLogTxn struct { clientv3.Txn @@ -296,3 +305,124 @@ func (txn *etcdTxn) commit() error { } return nil } + +type lowLevelTxnWrapper struct { + inner clientv3.Txn + rootPath string +} + +// If implements LowLevelTxn interface for adding conditions to the transaction. +func (l *lowLevelTxnWrapper) If(conditions ...LowLevelTxnCondition) LowLevelTxn { + cmpList := make([]clientv3.Cmp, 0, len(conditions)) + for _, c := range conditions { + key := strings.Join([]string{l.rootPath, c.Key}, "/") + if c.CmpType == LowLevelCmpExists { + cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0)) + } else if c.CmpType == LowLevelCmpNotExists { + cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) + } else { + var cmpOp string + switch c.CmpType { + case LowLevelCmpEqual: + cmpOp = "=" + case LowLevelCmpNotEqual: + cmpOp = "!=" + case LowLevelCmpGreater: + cmpOp = ">" + case LowLevelCmpLess: + cmpOp = "<" + default: + panic(fmt.Sprintf("unknown cmp type %v", c.CmpType)) + } + cmpList = append(cmpList, clientv3.Compare(clientv3.Value(key), cmpOp, c.Value)) + } + } + l.inner = l.inner.If(cmpList...) + return l +} + +func (l *lowLevelTxnWrapper) convertOps(ops []LowLevelTxnOp) []clientv3.Op { + opsList := make([]clientv3.Op, 0, len(ops)) + for _, op := range ops { + key := strings.Join([]string{l.rootPath, op.Key}, "/") + switch op.OpType { + case LowLevelOpPut: + opsList = append(opsList, clientv3.OpPut(key, op.Value)) + case LowLevelOpDelete: + opsList = append(opsList, clientv3.OpDelete(key)) + case LowLevelOpGet: + opsList = append(opsList, clientv3.OpGet(key)) + case LowLevelOpGetRange: + if op.EndKey == "\x00" { + opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit)))) + } else { + endKey := strings.Join([]string{l.rootPath, op.EndKey}, "/") + opsList = append(opsList, clientv3.OpGet(key, clientv3.WithRange(endKey), clientv3.WithLimit(int64(op.Limit)))) + } + default: + panic(fmt.Sprintf("unknown op type %v", op.OpType)) + } + } + return opsList +} + +// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to +// the transaction. +func (l *lowLevelTxnWrapper) Then(ops ...LowLevelTxnOp) LowLevelTxn { + l.inner = l.inner.Then(l.convertOps(ops)...) + return l +} + +// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass +// to the transaction. +func (l *lowLevelTxnWrapper) Else(ops ...LowLevelTxnOp) LowLevelTxn { + l.inner = l.inner.Else(l.convertOps(ops)...) + return l +} + +// Commit implements LowLevelTxn interface for committing the transaction. +func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, error) { + resp, err := l.inner.Commit() + if err != nil { + return LowLevelTxnResult{}, err + } + items := make([]LowLevelTxnResultItem, 0, len(resp.Responses)) + for i, respItem := range resp.Responses { + var resultItem LowLevelTxnResultItem + if put := respItem.GetResponsePut(); put != nil { + // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. + resultItem = LowLevelTxnResultItem{} + if put.PrevKv != nil { + key := strings.TrimPrefix(strings.TrimPrefix(string(put.PrevKv.Key), l.rootPath), "/") + resultItem.KeyValuePairs = []KeyValuePair{{ + Key: key, + Value: string(put.PrevKv.Value), + }} + } + } else if del := respItem.GetResponseDeleteRange(); del != nil { + // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. + resultItem = LowLevelTxnResultItem{} + } else if rangeResp := respItem.GetResponseRange(); rangeResp != nil { + kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs)) + for _, kv := range rangeResp.Kvs { + key := strings.TrimPrefix(strings.TrimPrefix(string(kv.Key), l.rootPath), "/") + kvs = append(kvs, KeyValuePair{ + Key: key, + Value: string(kv.Value), + }) + } + resultItem = LowLevelTxnResultItem{ + KeyValuePairs: kvs, + } + } else { + return LowLevelTxnResult{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( + fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, respItem), + ) + } + items = append(items, resultItem) + } + return LowLevelTxnResult{ + Succeeded: resp.Succeeded, + ResultItems: items, + }, nil +} diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index a6e870db9c9..db619c1d3bc 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -16,24 +16,160 @@ package kv import "context" -// Txn bundles multiple operations into a single executable unit. -// It enables kv to atomically apply a set of updates. -type Txn interface { +// LowLevelTxnCmpType represents the comparison type that is used in the condition of LowLevelTxn. +type LowLevelTxnCmpType int + +// LowLevelTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else` +// branch) of LowLevelTxn. +type LowLevelTxnOpType int + +// nolint:revive +const ( + LowLevelCmpEqual LowLevelTxnCmpType = iota + LowLevelCmpNotEqual + LowLevelCmpLess + LowLevelCmpGreater + LowLevelCmpExists + LowLevelCmpNotExists +) + +// nolint:revive +const ( + LowLevelOpPut LowLevelTxnOpType = iota + LowLevelOpDelete + LowLevelOpGet + LowLevelOpGetRange +) + +// LowLevelTxnCondition represents a condition in a LowLevelTxn. +type LowLevelTxnCondition struct { + Key string + CmpType LowLevelTxnCmpType + // The value to compare with. It's not used when CmpType is LowLevelCmpExists or LowLevelCmpNotExists. + Value string +} + +// CheckOnValue checks whether the condition is satisfied on the given value. +func (c *LowLevelTxnCondition) CheckOnValue(value string, exists bool) bool { + switch c.CmpType { + case LowLevelCmpEqual: + if exists && value == c.Value { + return true + } + case LowLevelCmpNotEqual: + if exists && value != c.Value { + return true + } + case LowLevelCmpLess: + if exists && value < c.Value { + return true + } + case LowLevelCmpGreater: + if exists && value > c.Value { + return true + } + case LowLevelCmpExists: + if exists { + return true + } + case LowLevelCmpNotExists: + if !exists { + return true + } + default: + panic("unreachable") + } + return false +} + +// LowLevelTxnOp represents an operation in a LowLevelTxn's `Then` or `Else` branch and will be executed according to +// the result of checking conditions. +type LowLevelTxnOp struct { + Key string + OpType LowLevelTxnOpType + Value string + // The end key when the OpType is LowLevelOpGetRange. + EndKey string + // The limit of the keys to get when the OpType is LowLevelOpGetRange. + Limit int +} + +// KeyValuePair represents a pair of key and value. +type KeyValuePair struct { + Key string + Value string +} + +// LowLevelTxnResultItem represents a single result of a read operation in a LowLevelTxn. +type LowLevelTxnResultItem struct { + KeyValuePairs []KeyValuePair +} + +// LowLevelTxnResult represents the result of a LowLevelTxn. The results of operations in `Then` or `Else` branches +// will be listed in `ResultItems` in the same order as the operations are added. +// For Put or Delete operations, its corresponding result is the previous value before writing. +type LowLevelTxnResult struct { + Succeeded bool + // The results of each operation in the `Then` branch or the `Else` branch of a transaction, depending on + // whether `Succeeded`. The i-th result belongs to the i-th operation added to the executed branch. + // * For Put or Delete operations, the result is empty. + // * For Get operations, the result contains a key-value pair representing the get result. In case the key + // does not exist, its `KeyValuePairs` field will be empty. + // * For GetRange operations, the result is a list of key-value pairs containing key-value paris that are scanned. + ResultItems []LowLevelTxnResultItem +} + +// LowLevelTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction +// API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the +// behavior is simulated. +// Considering that in different backends, the kv pairs may not have equivalent property of etcd's +// version, create-time, etc., the abstracted LowLevelTxn interface does not support comparing on them. +// It only supports checking the value or whether the key exists. +// Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior +// would be undefined. +type LowLevelTxn interface { + If(conditions ...LowLevelTxnCondition) LowLevelTxn + Then(ops ...LowLevelTxnOp) LowLevelTxn + Else(ops ...LowLevelTxnOp) LowLevelTxn + Commit(ctx context.Context) (LowLevelTxnResult, error) +} + +// BaseReadWrite is the API set, shared by Base and Txn interfaces, that provides basic KV read and write operations. +type BaseReadWrite interface { Save(key, value string) error Remove(key string) error Load(key string) (string, error) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) } +// Txn bundles multiple operations into a single executable unit. +// It enables kv to atomically apply a set of updates. +type Txn interface { + BaseReadWrite +} + // Base is an abstract interface for load/save pd cluster data. type Base interface { - Txn + BaseReadWrite // RunInTxn runs the user provided function in a Transaction. // If user provided function f returns a non-nil error, then // transaction will not be committed, the same error will be // returned by RunInTxn. // Otherwise, it returns the error occurred during the // transaction. + // + // This is a highly-simplified transaction interface. As + // etcd's transaction API is quite limited, it's hard to use it + // to provide a complete transaction model as how a normal database + // does. So when this API is running on etcd backend, each read on + // `txn` implicitly constructs a condition. + // (ref: https://etcd.io/docs/v3.5/learning/api/#transaction) + // When reading a range using `LoadRange`, for each key found in the + // range there will be a condition constructed. Be aware of the + // possibility of causing phantom read. + // RunInTxn may not suit all use cases. When RunInTxn is found not + // improper to use, consider using CreateLowLevelTxn instead. + // // Note that transaction are not committed until RunInTxn returns nil. // Note: // 1. Load and LoadRange operations provides only stale read. @@ -42,4 +178,11 @@ type Base interface { // 2. Only when storage is etcd, does RunInTxn checks that // values loaded during transaction has not been modified before commit. RunInTxn(ctx context.Context, f func(txn Txn) error) error + + // CreateLowLevelTxn creates a transaction that provides the if-then-else + // API pattern which is the same as how etcd does, makes it possible + // to precisely control how etcd's transaction API is used when the + // backend is etcd. When there's other backend types, the behavior will be + // simulated. + CreateLowLevelTxn() LowLevelTxn } diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index f05561b0c0b..51ee02651bc 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -38,6 +38,7 @@ func TestEtcd(t *testing.T) { testRange(re, kv) testSaveMultiple(re, kv, 20) testLoadConflict(re, kv) + testLowLevelTxn(re, kv) } func TestLevelDB(t *testing.T) { @@ -49,6 +50,7 @@ func TestLevelDB(t *testing.T) { testReadWrite(re, kv) testRange(re, kv) testSaveMultiple(re, kv, 20) + testLowLevelTxn(re, kv) } func TestMemKV(t *testing.T) { @@ -57,6 +59,7 @@ func TestMemKV(t *testing.T) { testReadWrite(re, kv) testRange(re, kv) testSaveMultiple(re, kv, 20) + testLowLevelTxn(re, kv) } func testReadWrite(re *require.Assertions, kv Base) { @@ -159,3 +162,236 @@ func testLoadConflict(re *require.Assertions, kv Base) { // When other writer exists, loader must error. re.Error(kv.RunInTxn(context.Background(), conflictLoader)) } + +// nolint:unparam +func mustHaveKeys(re *require.Assertions, kv Base, prefix string, expected ...KeyValuePair) { + keys, values, err := kv.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), 0) + re.NoError(err) + re.Equal(len(expected), len(keys)) + for i, key := range keys { + re.Equal(expected[i].Key, key) + re.Equal(expected[i].Value, values[i]) + } +} + +func testLowLevelTxn(re *require.Assertions, kv Base) { + // Test NotExists condition, putting in transaction. + res, err := kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpNotExists, + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-k1", + OpType: LowLevelOpPut, + Value: "v1", + }, + LowLevelTxnOp{ + Key: "txn-k2", + OpType: LowLevelOpPut, + Value: "v2", + }, + ).Else( + LowLevelTxnOp{ + Key: "txn-unexpected", + OpType: LowLevelOpPut, + Value: "unexpected", + }, + ).Commit(context.Background()) + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.ResultItems, 2) + re.Empty(res.ResultItems[0].KeyValuePairs) + re.Empty(res.ResultItems[1].KeyValuePairs) + + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) + + // Test Equal condition; reading in transaction. + res, err = kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpEqual, + Value: "v1", + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-k2", + OpType: LowLevelOpGet, + }, + ).Else( + LowLevelTxnOp{ + Key: "txn-unexpected", + OpType: LowLevelOpPut, + Value: "unexpected", + }, + ).Commit(context.Background()) + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.ResultItems, 1) + re.Len(res.ResultItems[0].KeyValuePairs, 1) + re.Equal("v2", res.ResultItems[0].KeyValuePairs[0].Value) + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) + + // Test NotEqual condition, else branch, reading range in transaction, reading & writing mixed. + res, err = kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpNotEqual, + Value: "v1", + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-unexpected", + OpType: LowLevelOpPut, + Value: "unexpected", + }, + ).Else( + LowLevelTxnOp{ + Key: "txn-k1", + OpType: LowLevelOpGetRange, + EndKey: "txn-k2\x00", + }, + LowLevelTxnOp{ + Key: "txn-k3", + OpType: LowLevelOpPut, + Value: "k3", + }, + ).Commit(context.Background()) + + re.NoError(err) + re.False(res.Succeeded) + re.Len(res.ResultItems, 2) + re.Len(res.ResultItems[0].KeyValuePairs, 2) + re.Equal([]KeyValuePair{{Key: "txn-k1", Value: "v1"}, {Key: "txn-k2", Value: "v2"}}, res.ResultItems[0].KeyValuePairs) + re.Empty(res.ResultItems[1].KeyValuePairs) + + mustHaveKeys(re, kv, "txn-", + KeyValuePair{Key: "txn-k1", Value: "v1"}, + KeyValuePair{Key: "txn-k2", Value: "v2"}, + KeyValuePair{Key: "txn-k3", Value: "k3"}) + + // Test Exists condition, deleting, overwriting. + res, err = kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpExists, + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-k1", + OpType: LowLevelOpDelete, + }, + LowLevelTxnOp{ + Key: "txn-k2", + OpType: LowLevelOpPut, + Value: "v22", + }, + // Delete not existing key. + LowLevelTxnOp{ + Key: "txn-k4", + OpType: LowLevelOpDelete, + }, + ).Else( + LowLevelTxnOp{ + Key: "txn-unexpected", + OpType: LowLevelOpPut, + Value: "unexpected", + }, + ).Commit(context.Background()) + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.ResultItems, 3) + for _, item := range res.ResultItems { + re.Empty(item.KeyValuePairs) + } + + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k2", Value: "v22"}, KeyValuePair{Key: "txn-k3", Value: "k3"}) + + // Deleted keys can be regarded as not existing correctly. + res, err = kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpNotExists, + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-k2", + OpType: LowLevelOpDelete, + }, + LowLevelTxnOp{ + Key: "txn-k3", + OpType: LowLevelOpDelete, + }, + ).Commit(context.Background()) + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.ResultItems, 2) + for _, item := range res.ResultItems { + re.Empty(item.KeyValuePairs) + } + mustHaveKeys(re, kv, "txn-") + + // The following tests only check the correctness of the conditions. + check := func(conditions []LowLevelTxnCondition, shouldSuccess bool) { + res, err := kv.CreateLowLevelTxn().If(conditions...).Commit(context.Background()) + re.NoError(err) + re.Equal(shouldSuccess, res.Succeeded) + } + + // "txn-k1" doesn't exist at this point. + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpExists}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotExists}}, true) + + err = kv.Save("txn-k1", "v1") + re.NoError(err) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpExists}}, true) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotExists}}, false) + + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}}, true) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v2"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v2"}}, true) + + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v0"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v2"}}, true) + + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v2"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v0"}}, true) + + // Test comparing with not-existing key. + err = kv.Remove("txn-k1") + re.NoError(err) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v1"}}, false) + + // Test the conditions are conjunctions. + err = kv.Save("txn-k1", "v1") + re.NoError(err) + err = kv.Save("txn-k2", "v2") + re.NoError(err) + check([]LowLevelTxnCondition{ + {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v2"}, + }, true) + check([]LowLevelTxnCondition{ + {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v0"}, + }, false) + check([]LowLevelTxnCondition{ + {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v2"}, + }, false) + check([]LowLevelTxnCondition{ + {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v0"}, + }, false) +} diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 5a74c1928e8..3f3f619c570 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -16,6 +16,7 @@ package kv import ( "context" + "fmt" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" @@ -80,6 +81,13 @@ func (kv *LevelDBKV) Remove(key string) error { return errors.WithStack(kv.Delete([]byte(key), nil)) } +// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. +func (kv *LevelDBKV) CreateLowLevelTxn() LowLevelTxn { + return &levelDBLowLevelTxnSimulator{ + kv: kv, + } +} + // levelDBTxn implements kv.Txn. // It utilizes leveldb.Batch to batch user operations to an atomic execution unit. type levelDBTxn struct { @@ -147,3 +155,132 @@ func (txn *levelDBTxn) commit() error { return txn.kv.Write(txn.batch, nil) } + +type levelDBLowLevelTxnSimulator struct { + kv *LevelDBKV + condition []LowLevelTxnCondition + onSuccessOps []LowLevelTxnOp + onFailureOps []LowLevelTxnOp +} + +// If implements LowLevelTxn interface for adding conditions to the transaction. +func (t *levelDBLowLevelTxnSimulator) If(conditions ...LowLevelTxnCondition) LowLevelTxn { + t.condition = append(t.condition, conditions...) + return t +} + +// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to +// the transaction. +func (t *levelDBLowLevelTxnSimulator) Then(ops ...LowLevelTxnOp) LowLevelTxn { + t.onSuccessOps = append(t.onSuccessOps, ops...) + return t +} + +// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass +// to the transaction. +func (t *levelDBLowLevelTxnSimulator) Else(ops ...LowLevelTxnOp) LowLevelTxn { + t.onFailureOps = append(t.onFailureOps, ops...) + return t +} + +// Commit implements LowLevelTxn interface for committing the transaction. +func (t *levelDBLowLevelTxnSimulator) Commit(_ctx context.Context) (res LowLevelTxnResult, err error) { + txn, err := t.kv.DB.OpenTransaction() + if err != nil { + return LowLevelTxnResult{}, err + } + defer func() { + // Set txn to nil when the function finished normally. + // When the function encounters any error and returns early, the transaction will be discarded here. + if txn != nil { + txn.Discard() + } + }() + + succeeds := true + for _, condition := range t.condition { + value, err := t.kv.DB.Get([]byte(condition.Key), nil) + valueStr := string(value) + exists := true + if err != nil { + if err == leveldb.ErrNotFound { + exists = false + } else { + return res, errors.WithStack(err) + } + } + + if !condition.CheckOnValue(valueStr, exists) { + succeeds = false + break + } + } + + ops := t.onSuccessOps + if !succeeds { + ops = t.onFailureOps + } + + results := make([]LowLevelTxnResultItem, 0, len(ops)) + + for _, operation := range ops { + switch operation.OpType { + case LowLevelOpPut: + err = txn.Put([]byte(operation.Key), []byte(operation.Value), nil) + if err != nil { + return res, errors.WithStack(err) + } + results = append(results, LowLevelTxnResultItem{}) + case LowLevelOpDelete: + err = txn.Delete([]byte(operation.Key), nil) + if err != nil { + return res, errors.WithStack(err) + } + results = append(results, LowLevelTxnResultItem{}) + case LowLevelOpGet: + value, err := txn.Get([]byte(operation.Key), nil) + result := LowLevelTxnResultItem{} + if err != nil { + if err != leveldb.ErrNotFound { + return res, errors.WithStack(err) + } + } else { + result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ + Key: operation.Key, + Value: string(value), + }) + } + results = append(results, result) + case LowLevelOpGetRange: + iter := txn.NewIterator(&util.Range{Start: []byte(operation.Key), Limit: []byte(operation.EndKey)}, nil) + result := LowLevelTxnResultItem{} + count := 0 + for iter.Next() { + if operation.Limit > 0 && count >= operation.Limit { + break + } + result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ + Key: string(iter.Key()), + Value: string(iter.Value()), + }) + count++ + } + iter.Release() + results = append(results, result) + default: + panic(fmt.Sprintf("unknown operation type %v", operation.OpType)) + } + } + + err = txn.Commit() + if err != nil { + return res, errors.WithStack(err) + } + // Avoid being discarded again in the defer block. + txn = nil + + return LowLevelTxnResult{ + Succeeded: succeeds, + ResultItems: results, + }, nil +} diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index cc5dca29851..9e402d31e8f 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -16,6 +16,7 @@ package kv import ( "context" + "fmt" "github.com/google/btree" @@ -52,11 +53,15 @@ func (s *memoryKVItem) Less(than *memoryKVItem) bool { func (kv *memoryKV) Load(key string) (string, error) { kv.RLock() defer kv.RUnlock() + return kv.loadNoLock(key), nil +} + +func (kv *memoryKV) loadNoLock(key string) string { item, ok := kv.tree.Get(memoryKVItem{key, ""}) if !ok { - return "", nil + return "" } - return item.value, nil + return item.value } // LoadRange loads the keys in the range of [key, endKey). @@ -69,6 +74,11 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string }) kv.RLock() defer kv.RUnlock() + keys, values := kv.loadRangeNoLock(key, endKey, limit) + return keys, values, nil +} + +func (kv *memoryKV) loadRangeNoLock(key, endKey string, limit int) ([]string, []string) { keys := make([]string, 0, limit) values := make([]string, 0, limit) kv.tree.AscendRange(memoryKVItem{key, ""}, memoryKVItem{endKey, ""}, func(item memoryKVItem) bool { @@ -79,24 +89,38 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string } return true }) - return keys, values, nil + return keys, values } // Save saves the key-value pair. func (kv *memoryKV) Save(key, value string) error { kv.Lock() defer kv.Unlock() - kv.tree.ReplaceOrInsert(memoryKVItem{key, value}) + kv.saveNoLock(key, value) return nil } +func (kv *memoryKV) saveNoLock(key, value string) { + kv.tree.ReplaceOrInsert(memoryKVItem{key, value}) +} + // Remove removes the key. func (kv *memoryKV) Remove(key string) error { kv.Lock() defer kv.Unlock() + kv.removeNoLock(key) + return nil +} +func (kv *memoryKV) removeNoLock(key string) { kv.tree.Delete(memoryKVItem{key, ""}) - return nil +} + +// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. +func (kv *memoryKV) CreateLowLevelTxn() LowLevelTxn { + return &memKvLowLevelTxnSimulator{ + kv: kv, + } } // memTxn implements kv.Txn. @@ -198,3 +222,93 @@ func (txn *memTxn) commit() error { } return nil } + +type memKvLowLevelTxnSimulator struct { + kv *memoryKV + conditions []LowLevelTxnCondition + onSuccessOps []LowLevelTxnOp + onFailureOps []LowLevelTxnOp +} + +// If implements LowLevelTxn interface for adding conditions to the transaction. +func (t *memKvLowLevelTxnSimulator) If(conditions ...LowLevelTxnCondition) LowLevelTxn { + t.conditions = append(t.conditions, conditions...) + return t +} + +// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to +// the transaction. +func (t *memKvLowLevelTxnSimulator) Then(ops ...LowLevelTxnOp) LowLevelTxn { + t.onSuccessOps = append(t.onSuccessOps, ops...) + return t +} + +// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass +// to the transaction. +func (t *memKvLowLevelTxnSimulator) Else(ops ...LowLevelTxnOp) LowLevelTxn { + t.onFailureOps = append(t.onFailureOps, ops...) + return t +} + +// Commit implements LowLevelTxn interface for committing the transaction. +func (t *memKvLowLevelTxnSimulator) Commit(_ctx context.Context) (LowLevelTxnResult, error) { + t.kv.Lock() + defer t.kv.Unlock() + + succeeds := true + for _, condition := range t.conditions { + value := t.kv.loadNoLock(condition.Key) + // There's a convention to represent not-existing key with empty value. + exists := value != "" + + if !condition.CheckOnValue(value, exists) { + succeeds = false + break + } + } + + ops := t.onSuccessOps + if !succeeds { + ops = t.onFailureOps + } + + results := make([]LowLevelTxnResultItem, 0, len(ops)) + + for _, operation := range ops { + switch operation.OpType { + case LowLevelOpPut: + t.kv.saveNoLock(operation.Key, operation.Value) + results = append(results, LowLevelTxnResultItem{}) + case LowLevelOpDelete: + t.kv.removeNoLock(operation.Key) + results = append(results, LowLevelTxnResultItem{}) + case LowLevelOpGet: + value := t.kv.loadNoLock(operation.Key) + result := LowLevelTxnResultItem{} + if len(value) > 0 { + result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ + Key: operation.Key, + Value: value, + }) + } + results = append(results, result) + case LowLevelOpGetRange: + keys, values := t.kv.loadRangeNoLock(operation.Key, operation.EndKey, operation.Limit) + result := LowLevelTxnResultItem{} + for i := range keys { + result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ + Key: keys[i], + Value: values[i], + }) + } + results = append(results, result) + default: + panic(fmt.Sprintf("unknown operation type %v", operation.OpType)) + } + } + + return LowLevelTxnResult{ + Succeeded: succeeds, + ResultItems: results, + }, nil +}