Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log_backup: add lock for truncating (#49469) #49709

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ func (ms *StreamMetadataSet) removeDataFilesAndUpdateMetadata(ctx context.Contex
num = int64(len(removed))

if ms.DryRun {
log.Debug("dry run, skip deletion ...")
log.Info("dry run, skip deletion ...")
return num, notDeleted, nil
}

// remove data file groups
for _, f := range removed {
log.Debug("Deleting file", zap.String("path", f.Path))
log.Info("Deleting file", zap.String("path", f.Path))
if err := storage.DeleteFile(ctx, f.Path); err != nil {
log.Warn("File not deleted.", zap.String("path", f.Path), logutil.ShortError(err))
notDeleted = append(notDeleted, f.Path)
Expand All @@ -249,6 +249,7 @@ func (ms *StreamMetadataSet) removeDataFilesAndUpdateMetadata(ctx context.Contex
ReplaceMetadata(meta, remainedDataFiles)

if ms.BeforeDoWriteBack != nil && ms.BeforeDoWriteBack(metaPath, meta) {
log.Info("Skipped writeback meta by the hook.", zap.String("meta", metaPath))
return num, notDeleted, nil
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"local.go",
"local_unix.go",
"local_windows.go",
"locking.go",
"memstore.go",
"noop.go",
"parse.go",
Expand Down Expand Up @@ -64,6 +65,7 @@ go_test(
"compress_test.go",
"gcs_test.go",
"local_test.go",
"locking_test.go",
"memstore_test.go",
"parse_test.go",
"s3_test.go",
Expand Down
124 changes: 124 additions & 0 deletions br/pkg/storage/locking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package storage

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"go.uber.org/zap"
)

// LockMeta is the meta information of a lock.
type LockMeta struct {
LockedAt time.Time `json:"locked_at"`
LockerHost string `json:"locker_host"`
LockerPID int `json:"locker_pid"`
Hint string `json:"hint"`
}

func (l LockMeta) String() string {
return fmt.Sprintf("Locked(at: %s, host: %s, pid: %d, hint: %s)", l.LockedAt.Format("2006-01-02 15:04:05"), l.LockerHost, l.LockerPID, l.Hint)
}

// ErrLocked is the error returned when the lock is held by others.
type ErrLocked struct {
Meta LockMeta
}

func (e ErrLocked) Error() string {
return fmt.Sprintf("locked, meta = %s", e.Meta)
}

// MakeLockMeta creates a LockMeta by the current node's metadata.
// Including current time and hostname, etc..
func MakeLockMeta(hint string) LockMeta {
hname, err := os.Hostname()
if err != nil {
hname = fmt.Sprintf("UnknownHost(err=%s)", err)
}
now := time.Now()
meta := LockMeta{
LockedAt: now,
LockerHost: hname,
Hint: hint,
LockerPID: os.Getpid(),
}
return meta
}

func readLockMeta(ctx context.Context, storage ExternalStorage, path string) (LockMeta, error) {
file, err := storage.ReadFile(ctx, path)
if err != nil {
return LockMeta{}, errors.Annotatef(err, "failed to read existed lock file %s", path)
}
meta := LockMeta{}
err = json.Unmarshal(file, &meta)
if err != nil {
return meta, errors.Annotatef(err, "failed to parse lock file %s", path)
}

return meta, nil
}

func putLockMeta(ctx context.Context, storage ExternalStorage, path string, meta LockMeta) error {
file, err := json.Marshal(meta)
if err != nil {
return errors.Annotatef(err, "failed to marshal lock meta %s", path)
}
err = storage.WriteFile(ctx, path, file)
if err != nil {
return errors.Annotatef(err, "failed to write lock meta at %s", path)
}
return nil
}

// TryLockRemote tries to create a "lock file" at the external storage.
// If success, we will create a file at the path provided. So others may not access the file then.
// Will return a `ErrLocked` if there is another process already creates the lock file.
// This isn't a strict lock like flock in linux: that means, the lock might be forced removed by
// manually deleting the "lock file" in external storage.
func TryLockRemote(ctx context.Context, storage ExternalStorage, path, hint string) (err error) {
defer func() {
log.Info("Trying lock remote file.", zap.String("path", path), zap.String("hint", hint), logutil.ShortError(err))
}()
exists, err := storage.FileExists(ctx, path)
if err != nil {
return errors.Annotatef(err, "failed to check lock file %s exists", path)
}
if exists {
meta, err := readLockMeta(ctx, storage, path)
if err != nil {
return err
}
return ErrLocked{Meta: meta}
}

meta := MakeLockMeta(hint)
return putLockMeta(ctx, storage, path, meta)
}

// UnlockRemote removes the lock file at the specified path.
// Removing that file will release the lock.
func UnlockRemote(ctx context.Context, storage ExternalStorage, path string) error {
meta, err := readLockMeta(ctx, storage, path)
if err != nil {
return err
}
// NOTE: this is for debug usage. For now, there isn't an Compare-And-Swap
// operation in our ExternalStorage abstraction.
// So, once our lock has been overwritten or we are overwriting other's lock,
// this information will be useful for troubleshooting.
log.Info("Releasing lock.", zap.Stringer("meta", meta), zap.String("path", path))
err = storage.DeleteFile(ctx, path)
if err != nil {
return errors.Annotatef(err, "failed to delete lock file %s", path)
}
return nil
}
61 changes: 61 additions & 0 deletions br/pkg/storage/locking_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package storage_test

import (
"context"
"os"
"path/filepath"
"testing"

backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)

func createMockStorage(t *testing.T) (storage.ExternalStorage, string) {
tempdir := t.TempDir()
storage, err := storage.New(context.Background(), &backup.StorageBackend{
Backend: &backup.StorageBackend_Local{
Local: &backup.Local{
Path: tempdir,
},
},
}, nil)
require.NoError(t, err)
return storage, tempdir
}

func requireFileExists(t *testing.T, path string) {
_, err := os.Stat(path)
require.NoError(t, err)
}

func requireFileNotExists(t *testing.T, path string) {
_, err := os.Stat(path)
require.True(t, os.IsNotExist(err))
}

func TestTryLockRemote(t *testing.T) {
ctx := context.Background()
strg, pth := createMockStorage(t)
err := storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.NoError(t, err)
requireFileExists(t, filepath.Join(pth, "test.lock"))
err = storage.UnlockRemote(ctx, strg, "test.lock")
require.NoError(t, err)
requireFileNotExists(t, filepath.Join(pth, "test.lock"))
}

func TestConflictLock(t *testing.T) {
ctx := context.Background()
strg, pth := createMockStorage(t)
err := storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.NoError(t, err)
err = storage.TryLockRemote(ctx, strg, "test.lock", "This file is mine!")
require.ErrorContains(t, err, "locked, meta = Locked")
requireFileExists(t, filepath.Join(pth, "test.lock"))
err = storage.UnlockRemote(ctx, strg, "test.lock")
require.NoError(t, err)
requireFileNotExists(t, filepath.Join(pth, "test.lock"))
}
25 changes: 19 additions & 6 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const (
flagStreamStartTS = "start-ts"
flagStreamEndTS = "end-ts"
flagGCSafePointTTS = "gc-ttl"

truncateLockPath = "truncating.lock"
hintOnTruncateLock = "There might be another truncate task running, or a truncate task that didn't exit properly. " +
"You may check the metadata and continue by wait other task finish or manually delete the lock file " + truncateLockPath + " at the external storage."
)

var (
Expand Down Expand Up @@ -930,7 +934,7 @@ func RunStreamStatus(
}

// RunStreamTruncate truncates the log that belong to (0, until-ts)
func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) (err error) {
console := glue.GetConsole(g)
em := color.New(color.Bold).SprintFunc()
warn := color.New(color.Bold, color.FgHiRed).SprintFunc()
Expand All @@ -944,12 +948,21 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
ctx, cancelFn := context.WithCancel(c)
defer cancelFn()

storage, err := cfg.makeStorage(ctx)
extStorage, err := cfg.makeStorage(ctx)
if err != nil {
return err
}
if err := storage.TryLockRemote(ctx, extStorage, truncateLockPath, hintOnTruncateLock); err != nil {
return err
}
// NOTE: the `return` is in a clousure which is an argument.
// I guess this should be a bug of the linter.
defer utils.WithCleanUp(&err, 10*time.Second, func(ctx context.Context) error {
//nolint: all_revive,revive
return storage.UnlockRemote(ctx, extStorage, truncateLockPath)
})

sp, err := restore.GetTSFromFile(ctx, storage, restore.TruncateSafePointFileName)
sp, err := restore.GetTSFromFile(ctx, extStorage, restore.TruncateSafePointFileName)
if err != nil {
return err
}
Expand All @@ -967,7 +980,7 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
}
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until)
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, extStorage, cfg.Until)
if err != nil {
return err
}
Expand Down Expand Up @@ -995,7 +1008,7 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre

if cfg.Until > sp && !cfg.DryRun {
if err := restore.SetTSToFile(
ctx, storage, cfg.Until, restore.TruncateSafePointFileName); err != nil {
ctx, extStorage, cfg.Until, restore.TruncateSafePointFileName); err != nil {
return err
}
}
Expand All @@ -1009,7 +1022,7 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
)
defer p.Close()

notDeleted, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, storage, p.IncBy)
notDeleted, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, extStorage, p.IncBy)
if err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions br/pkg/utils/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"go.uber.org/multierr"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
Expand Down Expand Up @@ -130,3 +131,20 @@ func CheckStoreLiveness(s *metapb.Store) error {
}
return nil
}

// WithCleanUp runs a function with a timeout, and register its error to its argument if there is one.
// This is useful while you want to run some must run but error-prone code in a defer context.
// Simple usage:
//
// func foo() (err error) {
// defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
// // do something
// return nil
// })
// }
func WithCleanUp(errOut *error, timeout time.Duration, fn func(context.Context) error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := fn(ctx)
*errOut = multierr.Combine(err, *errOut)
}
38 changes: 38 additions & 0 deletions br/pkg/utils/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package utils

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
)

func TestIsTypeCompatible(t *testing.T) {
Expand Down Expand Up @@ -136,3 +140,37 @@ func TestIsTypeCompatible(t *testing.T) {
require.True(t, IsTypeCompatible(*src, *target))
}
}

func TestWithCleanUp(t *testing.T) {
err1 := errors.New("meow?")
err2 := errors.New("nya?")

case1 := func() (err error) {
// NOTE: the `return` is in a clousure which is an argument.
// I guess this should be a bug of the linter.
defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
//nolint: all_revive,revive
return err1
})
return nil
}
require.ErrorIs(t, case1(), err1)

case2 := func() (err error) {
defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
//nolint: all_revive,revive
return err1
})
return err2
}
require.ElementsMatch(t, []error{err1, err2}, multierr.Errors(case2()))

case3 := func() (err error) {
defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
//nolint: all_revive,revive
return nil
})
return nil
}
require.NoError(t, case3())
}