Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: concurrently repairing indexes #59159

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ type CheckpointIngestIndexRepairSQL struct {
IndexName string `json:"index-name"`
AddSQL string `json:"add-sql"`
AddArgs []any `json:"add-args"`

OldIndexIDFound bool `json:"-"`
IndexRepaired bool `json:"-"`
}

type CheckpointIngestIndexRepairSQLs struct {
Expand Down
80 changes: 80 additions & 0 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"fmt"
"io"
"os"
"sync/atomic"
"time"

"github.com/fatih/color"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
"go.uber.org/zap"
"golang.org/x/term"
)

Expand Down Expand Up @@ -188,3 +191,80 @@ func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
color.RedString("ABORTED"))),
)
}

type ProgressBar interface {
Increment()
Done()
}

type MultiProgress interface {
AddTextBar(string, int64) ProgressBar
Wait()
}

func (ops ConsoleOperations) StartMultiProgress() MultiProgress {
if !ops.OutputIsTTY() {
return &NopMultiProgress{}
}
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
return &TerminalMultiProgress{
progress: pb,
}
}

type NopMultiProgress struct{}

type LogBar struct {
name string
total int64
}

func (nmp *NopMultiProgress) AddTextBar(name string, total int64) ProgressBar {
log.Info("progress start", zap.String("name", name))
return &LogBar{
name: name,
total: total,
}
}

func (nmp *NopMultiProgress) Wait() {}

func (lb *LogBar) Increment() {
if atomic.AddInt64(&lb.total, -1) <= 0 {
log.Info("progress done", zap.String("name", lb.name))
}
}

func (lb *LogBar) Done() {}

type TerminalBar struct {
bar *mpb.Bar
}

func (tb *TerminalBar) Increment() {
tb.bar.Increment()
}

func (tb *TerminalBar) Done() {
tb.bar.Abort(false)
tb.bar.Wait()
}

type TerminalMultiProgress struct {
progress *mpb.Progress
}

func (tmp *TerminalMultiProgress) AddTextBar(name string, total int64) ProgressBar {
bar := tmp.progress.New(total,
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(name)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText),
color.RedString("ABORTED"),
)),
)
return &TerminalBar{bar: bar}
}

func (tmp *TerminalMultiProgress) Wait() {
tmp.progress.Wait()
}
106 changes: 80 additions & 26 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const maxSplitKeysOnce = 10240
// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
const rawKVBatchCount = 64

// session count for repairing ingest indexes
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
const defaultRepairIndexSessionCount uint = 10

// LogRestoreManager is a comprehensive wrapper that encapsulates all logic related to log restoration,
// including concurrency management, checkpoint handling, and file importing for efficient log processing.
type LogRestoreManager struct {
Expand Down Expand Up @@ -452,16 +455,48 @@ func (rc *LogClient) CleanUpKVFiles(
return rc.logRestoreManager.fileImporter.ClearFiles(ctx, rc.pdClient, "v1")
}

// Init create db connection and domain for storage.
func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error {
var err error
rc.unsafeSession, err = g.CreateSession(store)
func createSession(ctx context.Context, g glue.Glue, store kv.Storage) (glue.Session, error) {
unsafeSession, err := g.CreateSession(store)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}

// Set SQL mode to None for avoiding SQL compatibility problem
err = rc.unsafeSession.Execute(ctx, "set @@sql_mode=''")
err = unsafeSession.Execute(ctx, "set @@sql_mode=''")
if err != nil {
return nil, errors.Trace(err)
}
return unsafeSession, nil
}

func createSessions(ctx context.Context, g glue.Glue, store kv.Storage, count uint) (unsafeSessions []glue.Session, createErr error) {
unsafeSessions = make([]glue.Session, 0, count)
defer func() {
if createErr != nil {
closeSessions(unsafeSessions)
}
}()
for range count {
unsafeSession, err := createSession(ctx, g, store)
if err != nil {
return nil, errors.Trace(err)
}
unsafeSessions = append(unsafeSessions, unsafeSession)
}
return unsafeSessions, nil
}

func closeSessions(sessions []glue.Session) {
for _, session := range sessions {
if session != nil {
session.Close()
}
}
}

// Init create db connection and domain for storage.
func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error {
var err error
rc.unsafeSession, err = createSession(ctx, g, store)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1761,39 +1796,60 @@ func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *inge

info := rc.dom.InfoSchema()
console := glue.GetConsole(g)
NEXTSQL:
for _, sql := range sqls {
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)

tableInfo, err := info.TableByName(ctx, sql.SchemaName, sql.TableName)
if err != nil {
return errors.Trace(err)
}
oldIndexIDFound := false
sql.OldIndexIDFound = false
sql.IndexRepaired = false
if fromCheckpoint {
for _, idx := range tableInfo.Indices() {
indexInfo := idx.Meta()
if indexInfo.ID == sql.IndexID {
// the original index id is not dropped
oldIndexIDFound = true
sql.OldIndexIDFound = true
break
}
// what if index's state is not public?
if indexInfo.Name.O == sql.IndexName {
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
// find the same name index, but not the same index id,
// which means the repaired index id is created
if _, err := fmt.Fprintf(console.Out(), "%s ... %s\n", progressTitle, color.HiGreenString("SKIPPED DUE TO CHECKPOINT MODE")); err != nil {
return errors.Trace(err)
}
continue NEXTSQL
sql.IndexRepaired = true
break
}
}
}
}

if err := func(sql checkpoint.CheckpointIngestIndexRepairSQL) error {
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
defer w.Close()
sessionCount := defaultRepairIndexSessionCount
unsafeSessions, err := createSessions(ctx, g, rc.dom.Store(), sessionCount)
if err != nil {
return errors.Trace(err)
}
defer func() {
closeSessions(unsafeSessions)
}()
workerpool := tidbutil.NewWorkerPool(sessionCount, "repair ingest index")
eg, ectx := errgroup.WithContext(ctx)
mp := console.StartMultiProgress()
for _, sql := range sqls {
if sql.IndexRepaired {
continue
}
if ectx.Err() != nil {
break
}
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
w := mp.AddTextBar(progressTitle, 1)
workerpool.ApplyWithIDInErrorGroup(eg, func(id uint64) error {
defer w.Done()

unsafeSession := unsafeSessions[id]
// TODO: When the TiDB supports the DROP and CREATE the same name index in one SQL,
// the checkpoint for ingest recorder can be removed and directly use the SQL:
// ALTER TABLE db.tbl DROP INDEX `i_1`, ADD IDNEX `i_1` ...
Expand All @@ -1804,8 +1860,8 @@ NEXTSQL:
// restored metakv and then skips repairing it.

// only when first execution or old index id is not dropped
if !fromCheckpoint || oldIndexIDFound {
if err := rc.unsafeSession.ExecuteInternal(ctx, alterTableDropIndexSQL, sql.SchemaName.O, sql.TableName.O, sql.IndexName); err != nil {
if !fromCheckpoint || sql.OldIndexIDFound {
if err := unsafeSession.ExecuteInternal(ectx, alterTableDropIndexSQL, sql.SchemaName.O, sql.TableName.O, sql.IndexName); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -1815,17 +1871,15 @@ NEXTSQL:
}
})
// create the repaired index when first execution or not found it
if err := rc.unsafeSession.ExecuteInternal(ctx, sql.AddSQL, sql.AddArgs...); err != nil {
return errors.Trace(err)
}
w.Inc()
if err := w.Wait(ctx); err != nil {
if err := unsafeSession.ExecuteInternal(ectx, sql.AddSQL, sql.AddArgs...); err != nil {
return errors.Trace(err)
}
w.Increment()
return nil
}(sql); err != nil {
return errors.Trace(err)
}
})
}
if err := eg.Wait(); err != nil {
return errors.Trace(err)
}

return nil
Expand Down
Loading