Skip to content

Commit

Permalink
list-objects to report: 'version-changed', 'deleted'
Browse files Browse the repository at this point in the history
* part four, prev. commit: 671558c

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 12, 2024
1 parent 671558c commit b46d0e1
Show file tree
Hide file tree
Showing 21 changed files with 103 additions and 55 deletions.
2 changes: 1 addition & 1 deletion ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ func (p *proxy) _bckpost(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg
lstcx.tcomsg.TCBMsg = *tcbmsg
xid, err = lstcx.do()
} else {
nlog.Infoln("x-tcb:", bckFrom.String(), bckTo.String())
nlog.Infoln("x-tcb:", bckFrom.String(), "=>", bckTo.String())
xid, err = p.tcb(bckFrom, bckTo, msg, tcbmsg.DryRun)
}
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions api/apc/lsmsg.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package apc: API messages and constants
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package apc

Expand Down Expand Up @@ -72,7 +72,10 @@ const (
// - when listing remote bucket, call backend (`Backend()`) to list non-recursively
LsNoRecursion

// For remote buckets: check for 'version-changed'
// For remote metadata-capable buckets (ie., bck.HasVersioningMD() == true):
// - check whether remote version exists,
// and if it does:
// - check whether remote version differs from its in-cluster copy
LsVerChanged
)

Expand All @@ -95,7 +98,8 @@ const (
EntryInArch = 1 << (EntryStatusBits + 2)
EntryIsDir = 1 << (EntryStatusBits + 3)
EntryIsArchive = 1 << (EntryStatusBits + 4)
EntryVerChanged = 1 << (EntryStatusBits + 5) // (see also: QparamLatestVer, et al.)
EntryVerChanged = 1 << (EntryStatusBits + 5) // see also: QparamLatestVer, et al.
EntryVerRemoved = 1 << (EntryStatusBits + 6) // ditto
)

// ObjEntry.Flags field
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (a *acli) runOnce(args []string) error {
}
if isStartingUp(err) {
for i := 0; i < 4; i++ {
time.Sleep(2 * time.Second)
briefPause(2)
fmt.Fprint(a.app.Writer, ". ")
if err = a.app.Run(args); err == nil {
fmt.Fprintln(a.app.Writer)
Expand Down Expand Up @@ -311,7 +311,7 @@ func (p *longRun) init(c *cli.Context, runOnce bool) {
warn := fmt.Sprintf("option '%s=%d' is invalid (must be >= 1). Proceeding with '%s=%d' (default).",
n, p.count, n, countDefault)
actionWarn(c, warn)
time.Sleep(2 * time.Second)
briefPause(2)
p.count = countDefault
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ var (
verChangedFlag = cli.BoolFlag{
Name: "check-versions",
Usage: "check whether listed remote objects and their in-cluster copies are identical, ie., have the same versions\n" +
indent4 + "\t- applies to remote buckets only\n" +
indent4 + "\t- applies to remote backends that maintain at least some form of versioning information (e.g., version, checksum, ETag)\n" +
indent4 + "\t- see related: 'ais get --latest', 'ais cp --sync', 'ais prefetch --latest'",
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func startDsortHandler(c *cli.Context) (err error) {
_ = teb.Print(config, teb.PropValTmpl)
}
}
time.Sleep(time.Second / 2)
briefPause(1)
fmt.Fprintln(c.App.Writer)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func getHandler(c *cli.Context) error {
return err
}
}
if flagIsSet(c, latestVerFlag) && !bck.IsCloud() && !bck.IsRemoteAIS() {
if flagIsSet(c, latestVerFlag) && !bck.HasVersioningMD() {
return fmt.Errorf("option %s is incompatible with the specified bucket %s\n"+
"(tip: can only GET latest object's version from a bucket with Cloud or remote AIS backend)",
qflprn(latestVerFlag), bck.String())
Expand Down
3 changes: 1 addition & 2 deletions cmd/cli/cli/log_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"os"
"path/filepath"
"strings"
"time"

"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/api/apc"
Expand Down Expand Up @@ -263,7 +262,7 @@ func _currentLog(c *cli.Context) error {
warn := fmt.Sprintf("run 'ais config node %s inherited %s %s' to change it back",
sname, nodeLogFlushName, config.Log.FlushTime)
actionWarn(c, warn)
time.Sleep(2 * time.Second)
briefPause(2)
fmt.Fprintln(c.App.Writer)
}
}
Expand Down
24 changes: 15 additions & 9 deletions cmd/cli/cli/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,22 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch bool) erro
if bck.IsRemote() {
addCachedCol = true
msg.SetFlag(apc.LsBckPresent) // default
if flagIsSet(c, verChangedFlag) {
msg.SetFlag(apc.LsVerChanged)
}
if flagIsSet(c, verChangedFlag) {
if bck.IsAIS() {
return fmt.Errorf("flag %s requires remote bucket (have: %s)", qflprn(verChangedFlag), bck)
}
if !bck.HasVersioningMD() {
return fmt.Errorf("flag %s only applies to remote backends that maintain at least some form of versioning information (have: %s)",
qflprn(verChangedFlag), bck)
}
} else if flagIsSet(c, verChangedFlag) {
return fmt.Errorf("flag %s applies to remote buckets only (have: %s)", qflprn(verChangedFlag), bck)
msg.SetFlag(apc.LsVerChanged)
}

if flagIsSet(c, listObjCachedFlag) {
if flagIsSet(c, verChangedFlag) {
return fmt.Errorf(errFmtExclusive, qflprn(verChangedFlag), qflprn(listObjCachedFlag))
actionWarn(c, "checking remote versions may take some time...\n")
briefPause(1)
}
msg.SetFlag(apc.LsObjCached)
addCachedCol = false // redundant
Expand Down Expand Up @@ -344,8 +350,8 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch bool) erro
// (due to mirroring, EC). The status helps to tell an object from its replica(s).
msg.AddProps(apc.GetPropsStatus)
}
propsStr = msg.Props // show these and only these props
// and finally:
propsStr = msg.Props // show these and _only_ these props
// finally:
if flagIsSet(c, verChangedFlag) {
if !msg.WantProp(apc.GetPropsCustom) {
msg.AddProps(apc.GetPropsCustom)
Expand All @@ -355,7 +361,7 @@ func listObjects(c *cli.Context, bck cmn.Bck, prefix string, listArch bool) erro
}
}

// set page
// set page size, limit
if flagIsSet(c, startAfterFlag) {
msg.StartAfter = parseStrFlag(c, startAfterFlag)
}
Expand Down Expand Up @@ -629,6 +635,6 @@ func (u *_listed) cb(ctx *api.LsoCounter) {
} else if !flagIsSet(u.c, noFooterFlag) {
elapsed := teb.FormatDuration(ctx.Elapsed())
fmt.Fprintf(u.c.App.Writer, "\rListed %s objects in %v\n", cos.FormatBigNum(ctx.Count()), elapsed)
time.Sleep(time.Second)
briefPause(1)
}
}
2 changes: 1 addition & 1 deletion cmd/cli/cli/storage_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (ctx *bsummCtx) progress(summaries *cmn.AllBsummResults, done bool) {

if len(results) > 1 {
if i < len(results)-1 {
time.Sleep(3 * time.Second)
briefPause(3)
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/cli/cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func findClosestCommand(cmd string, candidates []cli.Command) (result string, di
return closestName, minDist
}

func briefPause(seconds time.Duration) {
time.Sleep(seconds * time.Second) //nolint:durationcheck // false positive
}

// Get config from a random target.
func getRandTargetConfig(c *cli.Context) (*cmn.Config, error) {
smap, err := getClusterMap(c)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/xact.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func getKindNameForID(xid string, otherKind ...string) (kind, xname string, rerr
}
if herr, ok := err.(*cmn.ErrHTTP); ok && herr.Status == http.StatusNotFound {
// 2nd attempt assuming xaction in question `IdlesBeforeFinishing`
time.Sleep(time.Second)
briefPause(1)
xs, err := queryXactions(&xargs)
if err != nil {
rerr = err
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

// direct
require (
github.com/NVIDIA/aistore v1.3.22-0.20240111185148-4fc8fa04104b
github.com/NVIDIA/aistore v1.3.22-0.20240112195508-875206eec8a8
github.com/fatih/color v1.16.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.22-0.20240111185148-4fc8fa04104b h1:aNuTMxkImvqn6LNXdKz7Tjlba4IKjn16+PoLzqFn9gY=
github.com/NVIDIA/aistore v1.3.22-0.20240111185148-4fc8fa04104b/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/NVIDIA/aistore v1.3.22-0.20240112195508-875206eec8a8 h1:DfJwzy9U5pRbI+V37d8wtJJiSVs2MYFktYjUC0DDsyg=
github.com/NVIDIA/aistore v1.3.22-0.20240112195508-875206eec8a8/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
14 changes: 9 additions & 5 deletions cmd/cli/teb/lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,17 @@ func LsoTemplate(propsList []string, hideHeader, addCachedCol, addStatusCol bool
func fmtLsObjStatus(e *cmn.LsoEntry) string {
switch e.Status() {
case apc.LocOK:
if e.IsPresent() {
if e.IsVerChanged() {
return "version-changed"
}
if !e.IsPresent() {
return UnknownStatusVal
}
switch {
case e.IsVerChanged():
return fcyan("version-changed")
case e.IsVerRemoved():
return fred("deleted")
default:
return "ok"
}
return UnknownStatusVal
case apc.LocMisplacedNode:
return "misplaced(cluster)"
case apc.LocMisplacedMountpath:
Expand Down
7 changes: 6 additions & 1 deletion cmn/bck.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Package cmn provides common constants, types, and utilities for AIS clients
// and AIStore.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package cmn

Expand Down Expand Up @@ -390,6 +390,11 @@ func (b *Bck) IsCloud() bool {
return apc.IsCloudProvider(backend.Provider)
}

// A subset of remote backends that maintain assorted items of versioning information -
// the items including ETag, checksum, etc. - that, in turn, can be used to populate `ObjAttrs`
// * see related: `ObjAttrs.Equal`
func (b *Bck) HasVersioningMD() bool { return b.IsCloud() || b.IsRemoteAIS() }

func (b *Bck) HasProvider() bool { return b.Provider != "" }

//
Expand Down
2 changes: 2 additions & 0 deletions cmn/objlist_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (be *LsoEntry) SetPresent() { be.Flags |= apc.EntryIsCached }
// see also: "latest-ver", QparamLatestVer, et al.
func (be *LsoEntry) SetVerChanged() { be.Flags |= apc.EntryVerChanged }
func (be *LsoEntry) IsVerChanged() bool { return be.Flags&apc.EntryVerChanged != 0 }
func (be *LsoEntry) SetVerRemoved() { be.Flags |= apc.EntryVerRemoved }
func (be *LsoEntry) IsVerRemoved() bool { return be.Flags&apc.EntryVerRemoved != 0 }

func (be *LsoEntry) IsStatusOK() bool { return be.Status() == 0 }
func (be *LsoEntry) Status() uint16 { return be.Flags & apc.EntryStatusMask }
Expand Down
12 changes: 8 additions & 4 deletions core/ldp.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ remote:

// NOTE:
// - [PRECONDITION]: `versioning.validate_warm_get` || QparamLatestVer
// - caller must take wlock _or_ rlock
// - [Sync] when Sync option is used (both, via bucket configuration and/or `sync` argument) caller MUST take wlock or rlock
// - [MAY] delete remotely-deleted (non-existing) object and increment associated stats counter
func (lom *LOM) CheckRemoteMD(rlocked, sync bool) (bool, int, error) {
func (lom *LOM) CheckRemoteMD(locked, sync bool) (bool /*equal*/, int, error) {
bck := lom.Bck()
if !bck.IsCloud() && !bck.IsRemoteAIS() {
if !bck.HasVersioningMD() {
// nothing to do with: in-cluster ais:// bucket, or a remote one
// that doesn't provide any versioning metadata
return true, 0, nil
Expand All @@ -125,10 +125,14 @@ func (lom *LOM) CheckRemoteMD(rlocked, sync bool) (bool, int, error) {
if errCode == http.StatusNotFound {
err = cos.NewErrNotFound(T, lom.Cname())
}
if !locked {
// return info (neq and, possibly, not-found), and be done
return false, errCode, err
}

// rm remotely-deleted
if cos.IsNotExist(err, errCode) && (lom.VersionConf().Sync || sync) {
errDel := lom.Remove(rlocked /*force through rlock*/)
errDel := lom.Remove(locked /*force through rlock*/)
if errDel != nil {
errCode, err = 0, errDel
} else {
Expand Down
28 changes: 15 additions & 13 deletions core/meta/bck.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,21 @@ func (b *Bck) Bucket() *cmn.Bck { return (*cmn.Bck)(b) }
// inline delegations => cmn.Bck
//

func (b *Bck) IsAIS() bool { return (*cmn.Bck)(b).IsAIS() }
func (b *Bck) HasProvider() bool { return (*cmn.Bck)(b).HasProvider() }
func (b *Bck) IsHTTP() bool { return (*cmn.Bck)(b).IsHTTP() }
func (b *Bck) IsHDFS() bool { return (*cmn.Bck)(b).IsHDFS() }
func (b *Bck) IsCloud() bool { return (*cmn.Bck)(b).IsCloud() }
func (b *Bck) IsRemote() bool { return (*cmn.Bck)(b).IsRemote() }
func (b *Bck) IsRemoteAIS() bool { return (*cmn.Bck)(b).IsRemoteAIS() }
func (b *Bck) IsQuery() bool { return (*cmn.Bck)(b).IsQuery() }
func (b *Bck) RemoteBck() *cmn.Bck { return (*cmn.Bck)(b).RemoteBck() }
func (b *Bck) Validate() error { return (*cmn.Bck)(b).Validate() }
func (b *Bck) MakeUname(name string) string { return (*cmn.Bck)(b).MakeUname(name) }
func (b *Bck) Cname(name string) string { return (*cmn.Bck)(b).Cname(name) }
func (b *Bck) IsEmpty() bool { return (*cmn.Bck)(b).IsEmpty() }
func (b *Bck) IsAIS() bool { return (*cmn.Bck)(b).IsAIS() }
func (b *Bck) HasProvider() bool { return (*cmn.Bck)(b).HasProvider() }
func (b *Bck) IsHTTP() bool { return (*cmn.Bck)(b).IsHTTP() }
func (b *Bck) IsHDFS() bool { return (*cmn.Bck)(b).IsHDFS() }
func (b *Bck) IsCloud() bool { return (*cmn.Bck)(b).IsCloud() }
func (b *Bck) IsRemote() bool { return (*cmn.Bck)(b).IsRemote() }
func (b *Bck) IsRemoteAIS() bool { return (*cmn.Bck)(b).IsRemoteAIS() }
func (b *Bck) IsQuery() bool { return (*cmn.Bck)(b).IsQuery() }
func (b *Bck) RemoteBck() *cmn.Bck { return (*cmn.Bck)(b).RemoteBck() }
func (b *Bck) Validate() error { return (*cmn.Bck)(b).Validate() }
func (b *Bck) MakeUname(name string) string { return (*cmn.Bck)(b).MakeUname(name) }
func (b *Bck) Cname(name string) string { return (*cmn.Bck)(b).Cname(name) }
func (b *Bck) IsEmpty() bool { return (*cmn.Bck)(b).IsEmpty() }
func (b *Bck) HasVersioningMD() bool { return (*cmn.Bck)(b).HasVersioningMD() }

func (b *Bck) NewQuery() url.Values { return (*cmn.Bck)(b).NewQuery() }
func (b *Bck) AddToQuery(q url.Values) url.Values { return (*cmn.Bck)(b).AddToQuery(q) }

Expand Down
2 changes: 1 addition & 1 deletion ext/dload/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func headLink(link string) (resp *http.Response, err error) {
// to compare local object with its remote counterpart (source).
func CompareObjects(lom *core.LOM, dst *DstElement) (bool /*equal*/, error) {
if dst.Link == "" {
eq, _, err := lom.CheckRemoteMD(true /*rlocked*/, false /*sync*/) // TODO -- FIXME: use job.Sync()
eq, _, err := lom.CheckRemoteMD(true /*rlocked*/, false /*sync*/) // TODO: use job.Sync()
return eq, err
}

Expand Down
2 changes: 1 addition & 1 deletion xact/xs/lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (r *LsoXact) findToken(token string) uint {
if r.listRemote() && r.token == token {
return 0
}
return uint(sort.Search(len(r.lastPage), func(i int) bool { // TODO -- FIXME: revisit
return uint(sort.Search(len(r.lastPage), func(i int) bool { // TODO: revisit
return !cmn.TokenGreaterEQ(token, r.lastPage[i].Name)
}))
}
Expand Down
28 changes: 23 additions & 5 deletions xact/xs/wi_lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
Expand Down Expand Up @@ -85,15 +86,15 @@ func (wi *walkInfo) match(lom *core.LOM) bool {
if !cmn.ObjHasPrefix(lom.ObjName, wi.msg.Prefix) {
return false
}
if wi.msg.ContinuationToken != "" && cmn.TokenGreaterEQ(wi.msg.ContinuationToken, lom.ObjName) {
return false
}
return true
return wi.msg.ContinuationToken == "" || !cmn.TokenGreaterEQ(wi.msg.ContinuationToken, lom.ObjName)
}

// new entry to be added to the listed page
// new entry to be added to the listed page (note: slow path)
func (wi *walkInfo) ls(lom *core.LOM, status uint16) (e *cmn.LsoEntry) {
e = &cmn.LsoEntry{Name: lom.ObjName, Flags: status | apc.EntryIsCached}
if wi.msg.IsFlagSet(apc.LsVerChanged) {
checkRemoteMD(lom, e)
}
if wi.msg.IsFlagSet(apc.LsNameOnly) {
return
}
Expand All @@ -102,6 +103,23 @@ func (wi *walkInfo) ls(lom *core.LOM, status uint16) (e *cmn.LsoEntry) {
return
}

// NOTE: slow path
func checkRemoteMD(lom *core.LOM, e *cmn.LsoEntry) {
if !lom.Bucket().HasVersioningMD() {
debug.Assert(false, lom.Cname())
return
}
eq, errCode, err := lom.CheckRemoteMD(false /*locked*/, false /*sync*/)
switch {
case eq:
debug.AssertNoErr(err)
case cos.IsNotExist(err, errCode):
e.SetVerRemoved()
default:
e.SetVerChanged()
}
}

// Performs a number of syscalls to load object metadata.
func (wi *walkInfo) callback(fqn string, de fs.DirEntry) (entry *cmn.LsoEntry, err error) {
if de.IsDir() {
Expand Down

0 comments on commit b46d0e1

Please sign in to comment.