Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Mar 8, 2025
1 parent 81cd068 commit b1a07db
Show file tree
Hide file tree
Showing 26 changed files with 1,146 additions and 991 deletions.
310 changes: 12 additions & 298 deletions DEPS.bzl

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Session interface {
GetGlobalVariable(name string) (string, error)
GetGlobalSysVar(name string) (string, error)
GetSessionCtx() sessionctx.Context
AlterTableMode(ctx context.Context, schemaID int64, tableID int64, tableMode model.TableModeState) error
}

// BatchCreateTableSession is an interface to batch create table parallelly
Expand Down
16 changes: 15 additions & 1 deletion br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (gs *tidbSession) Close() {
gs.se.Close()
}

// GetGlobalVariables implements glue.Session.
// GetGlobalVariable implements glue.Session.
func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}
Expand All @@ -277,3 +277,17 @@ func (gs *tidbSession) GetGlobalSysVar(name string) (string, error) {
func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
}

func (gs *tidbSession) AlterTableMode(
_ context.Context,
schemaID int64,
tableID int64,
tableMode model.TableModeState) error {
d := domain.GetDomain(gs.se).DDLExecutor()
args := &model.AlterTableModeArgs{
SchemaID: schemaID,
TableID: tableID,
TableMode: tableMode,
}
return d.AlterTableMode(gs.se, args)
}
8 changes: 7 additions & 1 deletion br/pkg/gluetidb/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *mockSession) Close() {
s.se.Close()
}

// GetGlobalVariables implements glue.Session.
// GetGlobalVariable implements glue.Session.
func (s *mockSession) GetGlobalVariable(name string) (string, error) {
if ret, ok := s.globalVars[name]; ok {
return ret, nil
Expand All @@ -110,6 +110,12 @@ func (s *mockSession) GetGlobalSysVar(string) (string, error) {
return "", nil
}

// AlterTableMode implements glue.Session.
func (*mockSession) AlterTableMode(_ context.Context, _ int64, _ int64, _ model.TableModeState) error {
log.Fatal("unimplemented AlterTableMode for mock session")
return nil
}

// MockGlue only used for test
type MockGlue struct {
se sessiontypes.Session
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/internal/prealloc_db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error)
// not support placement policy, just ignore it
log.Warn("target tidb not support tidb_placement_mode, ignore create policies", zap.Error(err))
} else {
log.Info("set tidb_placement_mode success", zap.String("mode", policyMode))
log.Debug("set tidb_placement_mode success", zap.String("mode", policyMode))
supportPolicy = true
}
}
Expand Down
129 changes: 76 additions & 53 deletions br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/br/pkg/utils/consts"
"github.com/pingcap/tidb/pkg/meta/model"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -62,9 +62,8 @@ func NewRestoreMetaKVProcessor(client *LogClient, schemasReplace *stream.Schemas
// RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
ctx context.Context,
isOnline bool,
hasExplicitFilter bool,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
) error {
// starts gc row collector
rp.client.RunGCRowsLoader(ctx)
Expand All @@ -86,15 +85,16 @@ func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
return errors.Trace(err)
}

if isOnline {

} else {
if !hasExplicitFilter {
// TODO, use same technique to update schema as online
// global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
// the latest schema update.
log.Info("updating schema version to do full reload")
if err := rp.client.UpdateSchemaVersionFullReload(ctx); err != nil {
return errors.Trace(err)
}
} else {
log.Info("skip doing full reload filtered PiTR")
}
return nil
}
Expand Down Expand Up @@ -166,64 +166,87 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(

// process entries to collect table IDs
for _, entry := range curSortedEntries {
// parse entry and do the table mapping
if err = mp.tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, cf); err != nil {
return nil, errors.Trace(err)
// Process DDL job history keys (for deleted tables)
if utils.IsMetaDDLJobHistoryKey(entry.E.Key) && cf == consts.DefaultCF {
if err := mp.processDeletedTablesFromDDLJob(entry.E.Value); err != nil {
return nil, errors.Trace(err)
}
continue
}

// collect rename/partition exchange history
// get value from default cf and get the short value if possible from write cf
value, err := stream.ExtractValue(&entry.E, cf)
if err != nil {
// parse entry and do the table mapping, using tableHistoryManager as the collector
if err = mp.tableMappingManager.ParseMetaKvAndUpdateIdMapping(&entry.E, cf, mp.tableHistoryManager); err != nil {
return nil, errors.Trace(err)
}
}
return filteredEntries, nil
}

// write cf doesn't have short value in it
if value == nil {
continue
}
// processDeletedTablesFromDDLJob processes a DDL job and marks tables as deleted when appropriate
func (mp *MetaKVInfoProcessor) processDeletedTablesFromDDLJob(value []byte) error {
// Check if this is a DDL job that deletes tables
var job model.Job
if err := json.Unmarshal(value, &job); err != nil {
return errors.Trace(err)
}

if utils.IsMetaDBKey(entry.E.Key) {
rawKey, err := stream.ParseTxnMetaKeyFrom(entry.E.Key)
if err != nil {
return nil, errors.Trace(err)
// Handle multi-schema jobs
if job.MultiSchemaInfo != nil {
for i, sub := range job.MultiSchemaInfo.SubJobs {
proxyJob := sub.ToProxyJob(&job, i)
if err := mp.processDeletedTablesFromSingleJob(&proxyJob); err != nil {
return errors.Trace(err)
}
}
return nil
}

// Process single job
return mp.processDeletedTablesFromSingleJob(&job)
}

// processDeletedTablesFromSingleJob processes a single DDL job and marks tables as deleted when appropriate
func (mp *MetaKVInfoProcessor) processDeletedTablesFromSingleJob(job *model.Job) error {
// Process DDL jobs that delete tables
switch job.Type {
case model.ActionDropTable:
// Mark the table as deleted
mp.tableHistoryManager.MarkTableDeleted(job.TableID)

if meta.IsDBkey(rawKey.Field) {
var dbInfo model.DBInfo
if err := json.Unmarshal(value, &dbInfo); err != nil {
return nil, errors.Trace(err)
}
// collect db id -> name mapping during log backup, it will contain information about newly created db
mp.tableHistoryManager.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)
} else if !meta.IsDBkey(rawKey.Key) {
// also see RewriteMetaKvEntry
continue
} else if meta.IsTableKey(rawKey.Field) {
// collect table history indexed by table id, same id may have different table names in history
var tableInfo model.TableInfo
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, errors.Trace(err)
}
// cannot use dbib in the parsed table info cuz it might not set so default to 0
dbID, err := meta.ParseDBKey(rawKey.Key)
if err != nil {
return nil, errors.Trace(err)
}

// add to table rename history
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)

// track partitions if this is a partitioned table
if tableInfo.Partition != nil {
for _, def := range tableInfo.Partition.Definitions {
mp.tableHistoryManager.AddPartitionHistory(def.ID, tableInfo.Name.String(), dbID, tableInfo.ID)
}
}
// Check for partitions that might be dropped
args, err := model.GetDropTableArgs(job)
if err == nil && len(args.OldPartitionIDs) > 0 {
mp.tableHistoryManager.MarkTablesDeleted(args.OldPartitionIDs)
}
case model.ActionDropSchema:
// When dropping a schema, all tables in it are dropped
args, err := model.GetDropSchemaArgs(job)
if err == nil && len(args.AllDroppedTableIDs) > 0 {
mp.tableHistoryManager.MarkTablesDeleted(args.AllDroppedTableIDs)
}
case model.ActionTruncateTable:
// Truncate table creates a new table with a new ID, so the old one is effectively dropped
args, err := model.GetTruncateTableArgs(job)
if err == nil {
mp.tableHistoryManager.MarkTableDeleted(job.TableID)
if len(args.OldPartitionIDs) > 0 {
mp.tableHistoryManager.MarkTablesDeleted(args.OldPartitionIDs)
}
}
case model.ActionDropTablePartition:
// When dropping a partition, mark the partition IDs as deleted
args, err := model.GetTablePartitionArgs(job)
if err == nil && len(args.OldPhysicalTblIDs) > 0 {
mp.tableHistoryManager.MarkTablesDeleted(args.OldPhysicalTblIDs)
}
case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
// These actions can delete partitions
args, err := model.GetTablePartitionArgs(job)
if err == nil && len(args.OldPhysicalTblIDs) > 0 {
mp.tableHistoryManager.MarkTablesDeleted(args.OldPhysicalTblIDs)
}
}
return filteredEntries, nil
return nil
}

func (mp *MetaKVInfoProcessor) GetTableMappingManager() *stream.TableMappingManager {
Expand Down
57 changes: 51 additions & 6 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func readFilteredFullBackupTables(
tableAdded = true
continue
}
if !piTRIdTracker.ContainsTableId(db.Info.ID, table.Info.ID) {
if !piTRIdTracker.ContainsTableId(table.Info.ID) {
continue
}
tables[table.Info.ID] = table
Expand Down Expand Up @@ -1525,13 +1525,58 @@ func (rc *LogClient) UpdateSchemaVersionFullReload(ctx context.Context) error {
return nil
}

// SetTableModeToNormal sets the table mode back to normal from restore mode.
func (rc *LogClient) SetTableModeToNormal(ctx context.Context, schemaID int64, tableID int64) error {
sql := fmt.Sprintf("ALTER TABLE `%d`.`%d` MODE = 'normal'", schemaID, tableID)
return rc.unsafeSession.ExecuteInternal(ctx, sql)
// SetTableModeToNormal sets the table mode back to normal from restore mode if needed.
func (rc *LogClient) SetTableModeToNormal(ctx context.Context, schemaReplace *stream.SchemasReplace) error {
const batchSize = 8

var tables []struct {
schemaID int64
tableID int64
}

// collect all tables that need to be set to normal mode
for _, dbReplace := range schemaReplace.DbReplaceMap {
if dbReplace.FilteredOut {
continue
}
for _, tableReplace := range dbReplace.TableMap {
if tableReplace.FilteredOut {
continue
}
tables = append(tables, struct {
schemaID int64
tableID int64
}{
schemaID: dbReplace.DbID,
tableID: tableReplace.TableID,
})
}
}

log.Info("going to alter table mode", zap.Int("num", len(tables)))

// process tables in batches
// TODO: Need batch support from DDL
for i := 0; i < len(tables); i += batchSize {
end := i + batchSize
if end > len(tables) {
end = len(tables)
}

for _, table := range tables[i:end] {
log.Info("altering table mode", zap.Int64("schemaID", table.schemaID), zap.Any("table id", table.tableID))
if err := rc.unsafeSession.AlterTableMode(ctx, table.schemaID, table.tableID, model.TableModeNormal); err != nil {
log.Warn("failed to alter table mode, continuing with other tables",
zap.Int64("schemaID", table.schemaID),
zap.Any("tableID", table.tableID),
zap.Error(err))
}
}
}
return nil
}

// WrapCompactedFilesIteratorWithSplit applies a splitting strategy to the compacted files iterator.
// WrapCompactedFilesIterWithSplitHelper applies a splitting strategy to the compacted files iterator.
// It uses a region splitter to handle the splitting logic based on the provided rules and checkpoint sets.
func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper(
ctx context.Context,
Expand Down
Loading

0 comments on commit b1a07db

Please sign in to comment.