Skip to content

Commit

Permalink
extend 'copy bucket' to sync remote
Browse files Browse the repository at this point in the history
* CLI: add '--sync' option
* downloader: continued ref; reduce channel (burst) sizes
* part five

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 16, 2023
1 parent 385becc commit 4486b51
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 28 deletions.
1 change: 1 addition & 0 deletions cmd/cli/cli/bucket_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
refreshFlag,
waitFlag,
waitJobXactFinishedFlag,
syncFlag,
},
commandRename: {
waitFlag,
Expand Down
10 changes: 8 additions & 2 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ var (
Name: "prefix",
Usage: "list objects that start with the specified prefix, e.g.:\n" +
indent4 + "\t'--prefix a/b/c' - list virtual directory a/b/c and/or objects from the virtual directory\n" +
indent4 + "\ta/b that have their names (relative to this directory) starting with the letter c",
indent4 + "\ta/b that have their names (relative to this directory) starting with the letter 'c'",
}
getObjPrefixFlag = cli.StringFlag{
Name: listObjPrefixFlag.Name,
Expand Down Expand Up @@ -569,7 +569,13 @@ var (
Name: "object-list,from",
Usage: "path to file containing JSON array of object names to download",
}
syncFlag = cli.BoolFlag{Name: "sync", Usage: "sync bucket with Cloud"}

// sync
syncFlag = cli.BoolFlag{
Name: "sync",
Usage: "synchronize destination bucket with its remote (e.g., Cloud) source;\n" +
indent4 + "\tin particular, the option may entail removing of the objects that no longer exist remotely",
}

// dsort
dsortFsizeFlag = cli.StringFlag{Name: "fsize", Value: "1024", Usage: "size of the files in a shard"}
Expand Down
13 changes: 12 additions & 1 deletion cmd/cli/cli/tcbtco.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,24 @@ import (
// is the same as:
// (II) `ais cp from to --template abc"
func copyBucketHandler(c *cli.Context) (err error) {
var bckFrom, bckTo cmn.Bck
if c.NArg() == 0 {
return missingArgumentsError(c, c.Command.ArgsUsage)
}
bckFrom, bckTo, err := parseBcks(c, bucketSrcArgument, bucketDstArgument, 0 /*shift*/)

if c.NArg() == 1 && flagIsSet(c, syncFlag) {
bckFrom, err = parseBckURI(c, c.Args().Get(0), true /*error only*/)
bckTo = bckFrom
if err != nil {
err = incorrectUsageMsg(c, "invalid %s argument '%s' - %v", bucketSrcArgument, c.Args().Get(0), err)
}
} else {
bckFrom, bckTo, err = parseBcks(c, bucketSrcArgument, bucketDstArgument, 0 /*shift*/)
}
if err != nil {
return err
}

return tcbtco(c, "", bckFrom, bckTo, flagIsSet(c, copyAllObjsFlag))
}

Expand Down
2 changes: 1 addition & 1 deletion deploy/dev/local/aisnode_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ cat > $AIS_CONF_FILE <<EOL
"call_timeout": "10m",
"dsorter_mem_threshold": "100GB",
"compression": "${AIS_DSORT_COMPRESSION:-never}",
"bundle_multiplier": ${AIS_DSORT_BUNDLE_MULTIPLIER:-4}
"bundle_multiplier": ${AIS_DSORT_BUNDLE_MULTIPLIER:-4}
},
"tcb": {
"compression": "never",
Expand Down
12 changes: 12 additions & 0 deletions docs/cli/bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,18 @@ $ ais ls gs://coco-dataset --cached | grep Listed
Listed: 393 names
```

> Incidentally, notice the `--cached` difference:
```console
$ ais ls gs://coco-dataset --cached | grep Listed
Listed: 393 names

## vs _all_ including remote:

$ ais ls gs://coco-dataset | grep Listed
Listed: 2,290 names
```

### Options

```console
Expand Down
53 changes: 29 additions & 24 deletions ext/dload/diff_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,24 @@ type (
}

DiffResolverResult struct {
Action uint8
Err error
Src *cluster.LOM
Dst *DstElement
Err error
Action uint8
}
)

//////////////////
// DiffResolver //
//////////////////

// TODO: configurable burst size of the channels, plus `chanFull` check
func NewDiffResolver(ctx DiffResolverCtx) *DiffResolver {
return &DiffResolver{
ctx: ctx,
// TODO: configurable size of the channels, plus `chanFull` check
srcCh: make(chan *cluster.LOM, 1000),
dstCh: make(chan *DstElement, 1000),
resultCh: make(chan DiffResolverResult, 1000),
ctx: ctx,
srcCh: make(chan *cluster.LOM, 128),
dstCh: make(chan *DstElement, 128),
resultCh: make(chan DiffResolverResult, 128),
}
}

Expand All @@ -88,13 +88,16 @@ func (dr *DiffResolver) Start() {
Action: DiffResolverEOF,
}
return
} else if !srcOk || (dstOk && src.ObjName > dst.ObjName) {
}

switch {
case !srcOk || (dstOk && src.ObjName > dst.ObjName):
dr.resultCh <- DiffResolverResult{
Action: DiffResolverRecv,
Dst: dst,
}
dst, dstOk = <-dr.dstCh
} else if !dstOk || (srcOk && src.ObjName < dst.ObjName) {
case !dstOk || (srcOk && src.ObjName < dst.ObjName):
remote, err := dr.ctx.IsObjFromRemote(src)
if err != nil {
dr.resultCh <- DiffResolverResult{
Expand All @@ -118,7 +121,7 @@ func (dr *DiffResolver) Start() {
}
}
src, srcOk = <-dr.srcCh
} else { /* s.ObjName == d.ObjName */
default: /* s.ObjName == d.ObjName */
equal, err := dr.ctx.CompareObjects(src, dst)
if err != nil {
dr.resultCh <- DiffResolverResult{
Expand Down Expand Up @@ -200,26 +203,28 @@ func (dr *DiffResolver) walk(job jobif) {
WalkOpts: fs.WalkOpts{CTs: []string{fs.ObjectType}, Sorted: true},
}
opts.WalkOpts.Bck.Copy(job.Bck())
opts.Callback = func(fqn string, de fs.DirEntry) error {
if dr.Stopped() {
return cmn.NewErrAborted(job.String(), "diff-resolver stopped", nil)
}
lom := &cluster.LOM{}
if err := lom.InitFQN(fqn, job.Bck()); err != nil {
return err
}
if !job.checkObj(lom.ObjName) {
return nil
}
dr.PushSrc(lom)
return nil
}
opts.Callback = func(fqn string, _ fs.DirEntry) error { return dr.cb(fqn, job) }

err := fs.WalkBck(opts)
if err != nil && !cmn.IsErrAborted(err) {
dr.Abort(err)
}
}

func (dr *DiffResolver) cb(fqn string, job jobif) error {
if dr.Stopped() {
return cmn.NewErrAborted(job.String(), "diff-resolver stopped", nil)
}
lom := &cluster.LOM{}
if err := lom.InitFQN(fqn, job.Bck()); err != nil {
return err
}
if job.checkObj(lom.ObjName) {
dr.PushSrc(lom)
}
return nil
}

func (dr *DiffResolver) push(job jobif, d *dispatcher) {
defer func() {
dr.CloseDst()
Expand Down

0 comments on commit 4486b51

Please sign in to comment.