Skip to content

Commit

Permalink
copy/transform --sync: use probabilistic filtering
Browse files Browse the repository at this point in the history
* to optimize out remote-version check for those objects that were just copied
* prev. commit: 41b7aec
* separately, list-objects:
     - enforce '--check-versions' limitations on the server side

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 18, 2024
1 parent b721797 commit 75d9280
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 34 deletions.
16 changes: 16 additions & 0 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,22 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc
p.writeErr(w, r, cmn.NewErrNoNodes(apc.Target, smap.CountTargets()))
return
}
// enforce '--check-versions' limitations
if lsmsg.IsFlagSet(apc.LsVerChanged) {
const a = "cannot perform remote versions check"
if !bck.HasVersioningMD() {
p.writeErrMsg(w, r, a+": bucket "+bck.Cname("")+" does not provide (remote) versioning info")
return
}
if lsmsg.IsFlagSet(apc.LsNameOnly) || lsmsg.IsFlagSet(apc.LsNameSize) {
p.writeErrMsg(w, r, a+": flag 'LsVerChanged' is incompatible with 'LsNameOnly', 'LsNameSize'")
return
}
if !lsmsg.WantProp(apc.GetPropsCustom) {
p.writeErrf(w, r, a+" without listing %q (object property)", apc.GetPropsCustom)
return
}
}

// default props & flags => user-provided message
switch {
Expand Down
6 changes: 3 additions & 3 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,11 +1072,11 @@ func (goi *getOI) transmit(r io.Reader, buf []byte, fqn string) error {
// return special code to indicate just that
return errSendingResp
}
// Update objects which were sent during GFN. Thanks to this we will not
// Update objects sent during GFN. Thanks to this we will not
// have to resend them in rebalance. In case of a race between rebalance
// and GFN, the former wins and it will result in double send.
// and GFN the former wins, resulting in duplicated transmission.
if goi.isGFN {
goi.t.reb.FilterAdd([]byte(goi.lom.Uname()))
goi.t.reb.FilterAdd(cos.UnsafeB(goi.lom.Uname()))
} else if !goi.cold { // GFN & cold-GET: must be already loaded w/ atime set
if err := goi.lom.Load(false /*cache it*/, true /*locked*/); err != nil {
nlog.Errorf("%s: GET post-transmission failure: %v", goi.t, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

// direct
require (
github.com/NVIDIA/aistore v1.3.22-0.20240112200605-b46d0e1d8f75
github.com/NVIDIA/aistore v1.3.22-0.20240118015351-b721797502aa
github.com/fatih/color v1.16.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.22-0.20240112200605-b46d0e1d8f75 h1:tMz2IqmUsQeUwmEB9M6Ug3GQHQr7Hu1yZffLUvhq4BQ=
github.com/NVIDIA/aistore v1.3.22-0.20240112200605-b46d0e1d8f75/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/NVIDIA/aistore v1.3.22-0.20240118015351-b721797502aa h1:wNgAljV4M8QS7rQU+zQtLzJO2iAoKLOwLZ+lFX5bitg=
github.com/NVIDIA/aistore v1.3.22-0.20240118015351-b721797502aa/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
4 changes: 2 additions & 2 deletions cmn/prob/dyn_cuckoo.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (f *Filter) Reset() {
f.mtx.Lock()
for idx := 0; idx < len(f.filters); idx++ {
f.filters[idx].Reset()
f.filters[idx] = nil
}
f.filters = nil
clear(f.filters)
f.filters = f.filters[:0]
f.mtx.Unlock()
}
24 changes: 18 additions & 6 deletions xact/xs/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/cmn/prob"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
Expand All @@ -31,6 +32,7 @@ type prune struct {
prefix string
// run
joggers *mpather.Jgroup
filter *prob.Filter
same bool
}

Expand All @@ -45,6 +47,7 @@ func (rp *prune) init(config *cmn.Config) {
}
rmopts.Bck.Copy(rp.bckTo.Bucket())
rp.joggers = mpather.NewJoggerGroup(rmopts, config, "")
rp.filter = prob.NewDefaultFilter()
rp.same = rp.bckTo.Equal(rp.bckFrom, true, true)
}

Expand All @@ -61,7 +64,10 @@ func (rp *prune) wait() {
// wait for: joggers || parent-aborted
ticker := time.NewTicker(cmn.Rom.MaxKeepalive())
rp._wait(ticker)

// cleanup
ticker.Stop()
rp.filter.Reset()
}

func (rp *prune) _wait(ticker *time.Ticker) {
Expand All @@ -80,25 +86,31 @@ func (rp *prune) _wait(ticker *time.Ticker) {
}

func (rp *prune) do(dst *core.LOM, _ []byte) error {
var src *core.LOM

// construct src lom
var src *core.LOM
if rp.same {
src = dst
} else {
src = core.AllocLOM(dst.ObjName)
defer core.FreeLOM(src)
if src.InitBck(rp.bckFrom.Bucket()) != nil {
core.FreeLOM(src)
return nil
}
}
_, errCode, err := core.T.Backend(src.Bck()).HeadObj(context.Background(), src)
if !rp.same {
core.FreeLOM(src)

// skip objects already copied by rp.parent (compare w/ reb)
uname := cos.UnsafeB(src.Uname())
if rp.filter.Lookup(uname) {
rp.filter.Delete(uname)
return nil
}

// head
_, errCode, err := core.T.Backend(src.Bck()).HeadObj(context.Background(), src)
if err == nil || !cos.IsNotExist(err, errCode) {
return nil
}

// source does not exist: try to remove the destination (NOTE: best effort)
if !dst.TryLock(true) {
return nil
Expand Down
6 changes: 4 additions & 2 deletions xact/xs/tcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,11 @@ func (r *XactTCB) do(lom *core.LOM, buf []byte) (err error) {
core.FreeCOI(coiParams)
switch {
case err == nil:
// do nothing
if args.Msg.Sync {
r.prune.filter.Insert(cos.UnsafeB(lom.Uname()))
}
case cos.IsNotExist(err, 0):
// ditto
// do nothing
case cos.IsErrOOS(err):
r.Abort(err)
default:
Expand Down
29 changes: 12 additions & 17 deletions xact/xs/wanted_lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@ func (wi *walkInfo) setWanted(e *cmn.LsoEntry, lom *core.LOM) {
case apc.GetPropsCached: // via obj.SetPresent()

case apc.GetPropsSize:
if lom.Bucket().IsRemote() { // TODO: micro-optimize
if e.Size > 0 && lom.SizeBytes() != e.Size {
e.SetVerChanged()
}
if e.Size > 0 && lom.SizeBytes() != e.Size {
e.SetVerChanged()
}
e.Size = lom.SizeBytes()
case apc.GetPropsVersion:
Expand All @@ -79,19 +77,16 @@ func (wi *walkInfo) setWanted(e *cmn.LsoEntry, lom *core.LOM) {
debug.Assert(false, name)
}
}
if !wi.msg.IsFlagSet(apc.LsVerChanged) || e.IsVerChanged() || !lom.Bucket().IsRemote() /*ditto*/ {
return
}
//
// extensive version-changed check
//
md := cmn.S2CustomMD(custom, version)
if len(md) > 0 {
var oa cmn.ObjAttrs
oa.CustomMD = md
oa.Size = e.Size
if !lom.Equal(&oa) {
e.SetVerChanged()
if wi.msg.IsFlagSet(apc.LsVerChanged) && !e.IsVerChanged() {
// slow path: extensive version-changed check
md := cmn.S2CustomMD(custom, version)
if len(md) > 0 {
var oa cmn.ObjAttrs
oa.CustomMD = md
oa.Size = e.Size
if !lom.Equal(&oa) {
e.SetVerChanged()
}
}
}
}
2 changes: 1 addition & 1 deletion xact/xs/wi_lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type (
// context used to `list` objects in local filesystems
walkInfo struct {
smap *meta.Smap
lomVisitedCb lomVisitedCb
msg *apc.LsoMsg
lomVisitedCb lomVisitedCb
markerDir string
wanted cos.BitFlags
}
Expand Down

0 comments on commit 75d9280

Please sign in to comment.