Skip to content

Commit

Permalink
br: add tests to cover sequence id, auto increment id and auto random…
Browse files Browse the repository at this point in the history
… id, revert meta kv capture logic (#59109)

close #59108
  • Loading branch information
Tristan1900 authored Jan 26, 2025
1 parent 1dad768 commit debfc32
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 67 deletions.
3 changes: 2 additions & 1 deletion br/pkg/stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ go_test(
],
embed = [":stream"],
flaky = True,
shard_count = 48,
shard_count = 49,
deps = [
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//pkg/ddl",
"//pkg/kv",
"//pkg/meta",
"//pkg/meta/model",
"//pkg/parser/ast",
Expand Down
18 changes: 6 additions & 12 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ func (sr *SchemasReplace) rewriteKeyForDB(key []byte, cf string) ([]byte, error)

dbMap, exist := sr.DbMap[dbID]
if !exist {
// db filtered out
return nil, nil
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find db id:%v in maps", dbID)
}

rawMetaKey.UpdateField(meta.DBkey(dbMap.DbID))
Expand All @@ -148,8 +147,7 @@ func (sr *SchemasReplace) rewriteDBInfo(value []byte) ([]byte, error) {

dbMap, exist := sr.DbMap[dbInfo.ID]
if !exist {
// db filtered out
return nil, nil
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find db id:%v in maps", dbInfo.ID)
}

dbInfo.ID = dbMap.DbID
Expand Down Expand Up @@ -208,14 +206,12 @@ func (sr *SchemasReplace) rewriteKeyForTable(

dbReplace, exist := sr.DbMap[dbID]
if !exist {
// db filtered out
return nil, nil
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find db id:%v in maps", dbID)
}

tableReplace, exist := dbReplace.TableMap[tableID]
if !exist {
// table filtered out
return nil, nil
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find table id:%v in maps", tableID)
}

rawMetaKey.UpdateKey(meta.DBkey(dbReplace.DbID))
Expand All @@ -241,14 +237,12 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, er
// construct or find the id map.
dbReplace, exist = sr.DbMap[dbID]
if !exist {
// db filtered out
return nil, nil
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find db id:%v in maps", dbID)
}

tableReplace, exist = dbReplace.TableMap[tableInfo.ID]
if !exist {
// table filtered out
return nil, nil
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "failed to find table id:%v in maps", tableInfo.ID)
}

// update table ID and partition ID.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) {
dbMap[dbID2] = NewDBReplace(db2.Name.O, dbID2+100)
dbMap[dbID2].TableMap[tableID2] = NewTableReplace(t2.Name.O, tableID2+100)

tc := NewTableMappingManager(dbMap, mockGenGenGlobalID)
tc := NewTableMappingManager(dbMap, mockGenGlobalID)

//exchange partition, t1 partition0 with the t2
t1Copy := t1.Clone()
Expand Down
181 changes: 132 additions & 49 deletions br/pkg/stream/table_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ package stream
import (
"context"
"encoding/json"
"fmt"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
Expand Down Expand Up @@ -60,6 +58,9 @@ func NewTableMappingManager(
}

// ParseMetaKvAndUpdateIdMapping collect table information
// the keys and values that are selected to parse here follows the implementation in rewrite_meta_rawkv. Maybe
// parsing a subset of these keys/values would suffice, but to make it safe we decide to parse exactly same as
// in rewrite_meta_rawkv.
func (tc *TableMappingManager) ParseMetaKvAndUpdateIdMapping(e *kv.Entry, cf string) error {
if !IsMetaDBKey(e.Key) {
return nil
Expand All @@ -70,92 +71,174 @@ func (tc *TableMappingManager) ParseMetaKvAndUpdateIdMapping(e *kv.Entry, cf str
return errors.Trace(err)
}

value, err := extractValue(e, cf)
if err != nil {
return errors.Trace(err)
}
// sanity check
if value == nil {
log.Warn("entry suggests having short value but is nil")
return nil
}

if meta.IsDBkey(rawKey.Field) {
return tc.parseDBValueAndUpdateIdMapping(value)
// parse db key
err := tc.parseDBKeyAndUpdateIdMapping(rawKey.Field)
if err != nil {
return errors.Trace(err)
}

// parse value and update if exists
value, err := extractValue(e, cf)
if err != nil {
return errors.Trace(err)
}
if value != nil {
return tc.parseDBValueAndUpdateIdMapping(value)
}
} else if !meta.IsDBkey(rawKey.Key) {
return nil
}

if meta.IsTableKey(rawKey.Field) {
dbID, err := ParseDBIDFromTableKey(e.Key)
dbID, err := meta.ParseDBKey(rawKey.Key)
if err != nil {
return errors.Trace(err)
}

// parse table key and update
err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseTableKey)
if err != nil {
return errors.Trace(err)
}

// parse value and update if exists
value, err := extractValue(e, cf)
if err != nil {
return errors.Trace(err)
}
if value != nil {
return tc.parseTableValueAndUpdateIdMapping(dbID, value)
}
} else if meta.IsAutoIncrementIDKey(rawKey.Field) {
// parse auto increment key and update
err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseAutoIncrementIDKey)
if err != nil {
return errors.Trace(err)
}
} else if meta.IsAutoTableIDKey(rawKey.Field) {
// parse auto table key and update
err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseAutoTableIDKey)
if err != nil {
return errors.Trace(err)
}
} else if meta.IsSequenceKey(rawKey.Field) {
// parse sequence key and update
err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseSequenceKey)
if err != nil {
return errors.Trace(err)
}
} else if meta.IsAutoRandomTableIDKey(rawKey.Field) {
// parse sequence key and update
err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseAutoRandomTableIDKey)
if err != nil {
return errors.Trace(err)
}
return tc.parseTableValueAndUpdateIdMapping(dbID, value)
}

return nil
}

func (tc *TableMappingManager) parseDBKeyAndUpdateIdMapping(field []byte) error {
dbID, err := meta.ParseDBKey(field)
if err != nil {
return errors.Trace(err)
}

_, err = tc.getOrCreateDBReplace(dbID)
return errors.Trace(err)
}

func (tc *TableMappingManager) parseDBValueAndUpdateIdMapping(value []byte) error {
dbInfo := new(model.DBInfo)
if err := json.Unmarshal(value, dbInfo); err != nil {
return errors.Trace(err)
}

if dr, exist := tc.DbReplaceMap[dbInfo.ID]; !exist {
newID, err := tc.genGlobalIdFn(context.Background())
if err != nil {
return errors.Trace(err)
}
tc.DbReplaceMap[dbInfo.ID] = NewDBReplace(dbInfo.Name.O, newID)
tc.globalIdMap[dbInfo.ID] = newID
} else {
dr.Name = dbInfo.Name.O
dbReplace, err := tc.getOrCreateDBReplace(dbInfo.ID)
if err != nil {
return errors.Trace(err)
}
dbReplace.Name = dbInfo.Name.O
return nil
}

func (tc *TableMappingManager) parseTableValueAndUpdateIdMapping(dbID int64, value []byte) error {
var (
tableInfo model.TableInfo
err error
exist bool
dbReplace *DBReplace
tableReplace *TableReplace
)

if err := json.Unmarshal(value, &tableInfo); err != nil {
return errors.Trace(err)
}

// construct or find the id map.
dbReplace, exist = tc.DbReplaceMap[dbID]
// getOrCreateDBReplace gets an existing DBReplace or creates a new one if not found
func (tc *TableMappingManager) getOrCreateDBReplace(dbID int64) (*DBReplace, error) {
dbReplace, exist := tc.DbReplaceMap[dbID]
if !exist {
newID, err := tc.genGlobalIdFn(context.Background())
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
tc.globalIdMap[dbID] = newID
dbReplace = NewDBReplace("", newID)
tc.DbReplaceMap[dbID] = dbReplace
}
return dbReplace, nil
}

tableReplace, exist = dbReplace.TableMap[tableInfo.ID]
// getOrCreateTableReplace gets an existing TableReplace or creates a new one if not found
func (tc *TableMappingManager) getOrCreateTableReplace(dbReplace *DBReplace, tableID int64) (*TableReplace, error) {
tableReplace, exist := dbReplace.TableMap[tableID]
if !exist {
newID, exist := tc.globalIdMap[tableInfo.ID]
newID, exist := tc.globalIdMap[tableID]
if !exist {
var err error
newID, err = tc.genGlobalIdFn(context.Background())
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
tc.globalIdMap[tableInfo.ID] = newID
tc.globalIdMap[tableID] = newID
}
tableReplace = NewTableReplace("", newID)
dbReplace.TableMap[tableID] = tableReplace
}
return tableReplace, nil
}

func (tc *TableMappingManager) parseTableIdAndUpdateIdMapping(
key []byte,
field []byte,
parseField func([]byte) (tableID int64, err error)) error {
dbID, err := meta.ParseDBKey(key)
if err != nil {
return errors.Trace(err)
}

tableID, err := parseField(field)
if err != nil {
return errors.Trace(err)
}

dbReplace, err := tc.getOrCreateDBReplace(dbID)
if err != nil {
return errors.Trace(err)
}

_, err = tc.getOrCreateTableReplace(dbReplace, tableID)
if err != nil {
return errors.Trace(err)
}
return nil
}

tableReplace = NewTableReplace(tableInfo.Name.O, newID)
dbReplace.TableMap[tableInfo.ID] = tableReplace
} else {
tableReplace.Name = tableInfo.Name.O
func (tc *TableMappingManager) parseTableValueAndUpdateIdMapping(dbID int64, value []byte) error {
var tableInfo model.TableInfo
if err := json.Unmarshal(value, &tableInfo); err != nil {
return errors.Trace(err)
}

dbReplace, err := tc.getOrCreateDBReplace(dbID)
if err != nil {
return errors.Trace(err)
}

tableReplace, err := tc.getOrCreateTableReplace(dbReplace, tableInfo.ID)
if err != nil {
return errors.Trace(err)
}
tableReplace.Name = tableInfo.Name.O

// update table ID and partition ID.
tableInfo.ID = tableReplace.TableID
Expand Down Expand Up @@ -252,6 +335,6 @@ func extractValue(e *kv.Entry, cf string) ([]byte, error) {
}
return nil, nil
default:
panic(fmt.Sprintf("not support cf:%s", cf))
return nil, errors.Errorf("unsupported column family: %s", cf)
}
}
Loading

0 comments on commit debfc32

Please sign in to comment.