Skip to content

Commit

Permalink
multi-object copy/transform with '--sync' option
Browse files Browse the repository at this point in the history
* initial implementation

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 19, 2024
1 parent e679048 commit f6e6e1b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 7 deletions.
2 changes: 2 additions & 0 deletions ais/earlystart.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ until:
goto until // repeat
}

// TODO: skip when there's a single target (benign)

// do
var (
msg = &apc.ActMsg{Action: apc.ActRebalance, Value: metaction3}
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
// object
"get": "object get",
"put": "object put",
"rmo": "object rm",
"prefetch": "object prefetch", // same as "job start prefetch"
// bucket
"ls": "bucket ls",
Expand Down
33 changes: 27 additions & 6 deletions xact/xs/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package xs

import (
"context"
"fmt"
"net/http"
"time"

"github.com/NVIDIA/aistore/cmn"
Expand Down Expand Up @@ -37,7 +39,7 @@ type prune struct {
}

func (rp *prune) init(config *cmn.Config) {
debug.Assert(rp.bckFrom.HasVersioningMD(), rp.bckFrom.String())
debug.Assert(rp.bckFrom.IsAIS() || rp.bckFrom.HasVersioningMD(), rp.bckFrom.String())
rmopts := &mpather.JgroupOpts{
CTs: []string{fs.ObjectType},
VisitObj: rp.do,
Expand Down Expand Up @@ -100,18 +102,37 @@ func (rp *prune) do(dst *core.LOM, _ []byte) error {

// skip objects already copied by rp.parent (compare w/ reb)
uname := cos.UnsafeB(src.Uname())
if rp.filter.Lookup(uname) {
if rp.filter != nil && rp.filter.Lookup(uname) { // TODO -- FIXME: rm filter nil check once x-tco supports prob. filtering
rp.filter.Delete(uname)
return nil
}

// head
_, errCode, err := core.T.Backend(src.Bck()).HeadObj(context.Background(), src)
if err == nil || !cos.IsNotExist(err, errCode) {
// check whether src lom exists
var (
err error
errCode int
)
if src.Bck().IsAIS() {
smap := core.T.Sowner().Get()
tsi, errV := smap.HrwHash2T(src.Digest())
if errV != nil {
return fmt.Errorf("prune %s: fatal err: %w", rp.parent.Name(), errV)
}
if tsi.ID() == core.T.SID() {
err = src.Load(false, false)
} else {
if present := core.T.HeadObjT2T(src, tsi); !present {
errCode = http.StatusNotFound
}
}
} else {
_, errCode, err = core.T.Backend(src.Bck()).HeadObj(context.Background(), src)
}
if err == nil || !cos.IsNotExist(err, errCode) /*not complaining*/ {
return nil
}

// source does not exist: try to remove the destination (NOTE: best effort)
// source does not exist: try to remove the destination (NOTE best effort)
if !dst.TryLock(true) {
return nil
}
Expand Down
53 changes: 52 additions & 1 deletion xact/xs/tcobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,23 @@ func (r *XactTCObjs) Run(wg *sync.WaitGroup) {
nat := smap.CountActiveTs()
wi.refc.Store(int32(nat - 1))

// run
var wg *sync.WaitGroup
if err = lrit.init(r, &msg.ListRange, r.Bck()); err == nil {
if msg.Sync && lrit.lrp != lrpList {
wg = &sync.WaitGroup{}
wg.Add(1)
go func() {
r.prune(lrit, smap)
wg.Done()
}()
}
err = lrit.run(wi, smap)
}
if wg != nil {
wg.Wait()
}

if r.IsAborted() || err != nil {
goto fin
}
Expand Down Expand Up @@ -322,6 +336,43 @@ func (wi *tcowi) do(lom *core.LOM, lrit *lriterator) {
wi.r.AddErr(err, 5, cos.SmoduleXs)
}
} else if cmn.Rom.FastV(5, cos.SmoduleXs) {
nlog.Infof("%s: tco-lr %s => %s", wi.r.Base.Name(), lom.Cname(), wi.r.args.BckTo.Cname(objNameTo))
nlog.Infoln(wi.r.Name()+":", lom.Cname(), "=>", wi.r.args.BckTo.Cname(objNameTo))
}
}

//
// remove objects not present at the source (when synchronizing bckFrom => bckTo)
// TODO: probabilistic filtering
//

type syncwi struct {
rp *prune
}

// interface guard
var _ lrwi = (*syncwi)(nil)

func (r *XactTCObjs) prune(lrit *lriterator, smap *meta.Smap) {
rp := prune{parent: r}
rp.bckFrom, rp.bckTo = r.FromTo()

// tcb use case
if lrit.lrp == lrpPrefix {
rp.prefix = lrit.pt.Prefix
rp.init(r.config)
rp.run()
rp.wait()
return
}

// same range iterator but different bucket
debug.Assert(lrit.lrp == lrpRange)
syncit := *lrit
syncit.bck = rp.bckTo
syncwi := &syncwi{&rp} // reusing only prune.do (and not init/run/wait)
syncit.run(syncwi, smap)
}

func (syncwi *syncwi) do(lom *core.LOM, _ *lriterator) {
syncwi.rp.do(lom, nil)
}

0 comments on commit f6e6e1b

Please sign in to comment.