Skip to content

Commit

Permalink
aws not-found, job status, other cleanup
Browse files Browse the repository at this point in the history
* aws error handling: do not return "NotFound: Not Found" message
* do _not_ return error when
  querying status of a job that had finished with errors
* inc deleted counter only when deleted
* up cli

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 5, 2024
1 parent 65ce7ea commit ab4cccc
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 49 deletions.
32 changes: 16 additions & 16 deletions ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ func (*awsProvider) HeadBucket(_ ctx, bck *meta.Bck) (bckProps cos.StrKVs, errCo
// AWS bucket may not yet exist in the BMD -
// get the region manually and recreate S3 client.
if region, err = getBucketLocation(svc, cloudBck.Name); err != nil {
errCode, err = awsErrorToAISError(err, cloudBck)
errCode, err = awsErrorToAISError(err, cloudBck, "")
return
}
// Create new svc with the region details.
if svc, _, err = newClient(sessConf{region: region}, ""); err != nil {
errCode, err = awsErrorToAISError(err, cloudBck)
errCode, err = awsErrorToAISError(err, cloudBck, "")
return
}
}
Expand All @@ -124,7 +124,7 @@ func (*awsProvider) HeadBucket(_ ctx, bck *meta.Bck) (bckProps cos.StrKVs, errCo
}
versioned, errV := getBucketVersioning(svc, cloudBck)
if errV != nil {
errCode, err = awsErrorToAISError(errV, cloudBck)
errCode, err = awsErrorToAISError(errV, cloudBck, "")
return
}
bckProps[apc.HdrBucketVerEnabled] = strconv.FormatBool(versioned)
Expand Down Expand Up @@ -170,7 +170,7 @@ func (awsp *awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Ls
if verbose {
nlog.Infoln("list_objects", cloudBck.Name, err)
}
errCode, err = awsErrorToAISError(err, cloudBck)
errCode, err = awsErrorToAISError(err, cloudBck, "")
return
}

Expand Down Expand Up @@ -222,7 +222,7 @@ func (awsp *awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Ls
verParams.Prefix = aws.String(entry.Name)
verResp, err := svc.ListObjectVersions(verParams)
if err != nil {
return awsErrorToAISError(err, cloudBck)
return awsErrorToAISError(err, cloudBck, "")
}
for _, vers := range verResp.Versions {
if latest := *(vers.IsLatest); !latest {
Expand All @@ -249,12 +249,12 @@ func (awsp *awsProvider) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.Ls
func (*awsProvider) ListBuckets(cmn.QueryBcks) (bcks cmn.Bcks, errCode int, err error) {
svc, _, err := newClient(sessConf{}, "")
if err != nil {
errCode, err = awsErrorToAISError(err, &cmn.Bck{Provider: apc.AWS})
errCode, err = awsErrorToAISError(err, &cmn.Bck{Provider: apc.AWS}, "")
return
}
result, err := svc.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
errCode, err = awsErrorToAISError(err, &cmn.Bck{Provider: apc.AWS})
errCode, err = awsErrorToAISError(err, &cmn.Bck{Provider: apc.AWS}, "")
return
}

Expand Down Expand Up @@ -291,7 +291,7 @@ func (*awsProvider) HeadObj(_ ctx, lom *core.LOM) (oa *cmn.ObjAttrs, errCode int
Key: aws.String(lom.ObjName),
})
if err != nil {
errCode, err = awsErrorToAISError(err, cloudBck)
errCode, err = awsErrorToAISError(err, cloudBck, lom.ObjName)
return
}
oa = &cmn.ObjAttrs{}
Expand Down Expand Up @@ -372,7 +372,7 @@ func (*awsProvider) GetObjReader(ctx context.Context, lom *core.LOM) (res core.G
Key: aws.String(lom.ObjName),
})
if err != nil {
res.ErrCode, res.Err = awsErrorToAISError(err, cloudBck)
res.ErrCode, res.Err = awsErrorToAISError(err, cloudBck, lom.ObjName)
return
}

Expand Down Expand Up @@ -444,7 +444,7 @@ func (*awsProvider) PutObj(r io.ReadCloser, lom *core.LOM) (errCode int, err err
Metadata: md,
})
if err != nil {
errCode, err = awsErrorToAISError(err, cloudBck)
errCode, err = awsErrorToAISError(err, cloudBck, lom.ObjName)
cos.Close(r)
return
}
Expand Down Expand Up @@ -485,7 +485,7 @@ func (*awsProvider) DeleteObj(lom *core.LOM) (errCode int, err error) {
Key: aws.String(lom.ObjName),
})
if err != nil {
errCode, err = awsErrorToAISError(err, cloudBck)
errCode, err = awsErrorToAISError(err, cloudBck, lom.ObjName)
return
}
if superVerbose {
Expand Down Expand Up @@ -613,18 +613,18 @@ func getBucketLocation(svc *s3.S3, bckName string) (region string, err error) {
return
}

func awsErrorToAISError(awsError error, bck *cmn.Bck) (int, error) {
func awsErrorToAISError(awsError error, bck *cmn.Bck, objName string) (int, error) {
reqErr, ok := awsError.(awserr.RequestFailure)
if !ok {
return http.StatusInternalServerError, _awsErr(awsError)
}
awsCode, status := reqErr.Code(), reqErr.StatusCode()
switch awsCode {
case s3.ErrCodeNoSuchBucket:
switch {
case awsCode == s3.ErrCodeNoSuchBucket:
return status, cmn.NewErrRemoteBckNotFound(bck)
case s3.ErrCodeNoSuchKey:
case awsCode == s3.ErrCodeNoSuchKey || (status == http.StatusNotFound && objName != ""):
debug.Assert(status == http.StatusNotFound, status) // expected
fallthrough
return status, errors.New("aws-error[NotFound: " + bck.Cname(objName) + "]")
default:
return status, _awsErr(awsError)
}
Expand Down
12 changes: 4 additions & 8 deletions ais/ic.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ func (ic *ic) xstatusAll(w http.ResponseWriter, r *http.Request, query url.Value
if !msg.Bck.IsEmpty() {
flt.Bck = (*meta.Bck)(&msg.Bck)
}
nls := ic.p.notifs.findAll(flt)

var vec nl.StatusVec

var (
vec nl.StatusVec
nls = ic.p.notifs.findAll(flt)
)
if cos.IsParseBool(query.Get(apc.QparamForce)) {
// (force just-in-time)
// for each args-selected xaction:
Expand Down Expand Up @@ -261,11 +262,6 @@ func (ic *ic) xstatusOne(w http.ResponseWriter, r *http.Request) {
status := nl.Status()
if err := nl.Err(); err != nil {
status.ErrMsg = err.Error()
if !nl.Aborted() {
// TODO -- FIXME: Silent (apc.QparamSilent)
ic.p.writeErrf(w, r, "%s: %v", nl.Name(), err)
return
}
}
b := cos.MustMarshal(status) // TODO: include stats, e.g., progress when ready
w.Header().Set(cos.HdrContentLength, strconv.Itoa(len(b)))
Expand Down
5 changes: 3 additions & 2 deletions ais/test/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,13 @@ func TestDownloadRemote(t *testing.T) {
xid, err = api.EvictMultiObj(baseParams, test.srcBck, expectedObjs, "" /*template*/)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
status, err := api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
if test.srcBck.Equal(&test.dstBck) {
tassert.CheckFatal(t, err)
} else {
// this time downloaded a different bucket - test.srcBck remained empty
tassert.Errorf(t, err != nil, "list iterator must produce not-found when not finding listed objects")
tassert.Errorf(t, status.ErrMsg != "", "expecting errors when when not finding listed objects")
}

tlog.Logln("starting remote download...")
Expand Down
7 changes: 4 additions & 3 deletions ais/test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,9 @@ func TestSameBucketName(t *testing.T) {
evictListID, err := api.EvictMultiObj(baseParams, bckRemote, files, "" /*template*/)
tassert.CheckFatal(t, err)
args := xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.Errorf(t, err != nil, "list iterator must produce not-found when not finding listed objects")
status, err := api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
tassert.Errorf(t, status.ErrMsg != "", "expecting errors when not finding listed objects")

tlog.Logf("EvictRange\n")
evictRangeID, err := api.EvictMultiObj(baseParams, bckRemote, nil /*lst objnames*/, objRange)
Expand All @@ -412,7 +413,7 @@ func TestSameBucketName(t *testing.T) {
tassert.CheckFatal(t, err)

// Check that ais bucket has 2 objects
tlog.Logf("Validating ais bucket has %s and %s ...\n", fileName1, fileName2)
tlog.Logf("Validating that ais bucket contains %s and %s ...\n", fileName1, fileName2)
_, err = api.HeadObject(baseParams, bckLocal, fileName1, apc.FltPresent, false /*silent*/)
tassert.CheckFatal(t, err)
_, err = api.HeadObject(baseParams, bckLocal, fileName2, apc.FltPresent, false /*silent*/)
Expand Down
4 changes: 2 additions & 2 deletions ais/test/scripts/s3-prefetch-latest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ ais bucket props set $bucket versioning.sync_warm_get=true

echo "12. run 'prefetch --latest' one last time, and make sure the object \"disappears\""
ais prefetch "$bucket/lorem-duis" --latest --wait 2>/dev/null
[[ $? != 0 ]] || { echo "FAIL: expecting blocking 'prefetch' call to return an error, got $?"; exit 1; }
[[ $? == 0 ]] || { echo "FAIL: expecting 'prefetch --wait' to return Ok, got $?"; exit 1; }

echo "13. remote-deleted counter must increment"
echo "13. 'remote-deleted' counter must increment"
cnt5=$(ais show performance counters --regex REMOTE-DEL -H | awk '{sum+=$2;}END{print sum;}')
[[ $cnt5 == $(($cnt4+1)) ]] || { echo "FAIL: $cnt5 != $(($cnt4+1))"; exit 1; }

Expand Down
8 changes: 4 additions & 4 deletions ais/test/xaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func TestXactionAllStatus(t *testing.T) {
}
for _, test := range tests {
for kind := range xact.Table {
xargs := xact.ArgsMsg{Kind: kind, OnlyRunning: test.running}
xargs := xact.ArgsMsg{Kind: kind, OnlyRunning: test.running, Force: test.force}
if mono.NanoTime()&0x1 == 0x1 {
_, xname := xact.GetKindName(kind)
xargs.Kind = xname
}
vec, err := api.GetAllXactionStatus(baseParams, &xargs, test.force)
vec, err := api.GetAllXactionStatus(baseParams, &xargs)
tassert.CheckFatal(t, err)
if len(vec) == 0 {
continue
Expand Down Expand Up @@ -79,8 +79,8 @@ func TestXactionAllStatus(t *testing.T) {
// re-check after a while
time.Sleep(2 * time.Second)

xargs = xact.ArgsMsg{Kind: kind, OnlyRunning: false}
vec, err = api.GetAllXactionStatus(baseParams, &xargs, test.force)
xargs = xact.ArgsMsg{Kind: kind, OnlyRunning: false, Force: test.force}
vec, err = api.GetAllXactionStatus(baseParams, &xargs)
tassert.CheckFatal(t, err)
for _, a := range aborted {
found := false
Expand Down
10 changes: 4 additions & 6 deletions ais/tgtxact.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (t *target) httpxput(w http.ResponseWriter, r *http.Request) {
}
}
if config := cmn.GCO.Get(); config.FastV(4, cos.SmoduleAIS) {
nlog.Infof("%s %s", msg.Action, xargs.String())
nlog.Infoln(msg.Action, xargs.String())
}
switch msg.Action {
case apc.ActXactStart:
debug.Assert(xact.IsValidKind(xargs.Kind), xargs.String())
if err := t.xstart(r, &xargs, bck); err != nil {
if err := t.xstart(&xargs, bck); err != nil {
t.writeErr(w, r, err)
return
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func (t *target) xquery(w http.ResponseWriter, r *http.Request, what string, xac
// PUT
//

func (t *target) xstart(r *http.Request, args *xact.ArgsMsg, bck *meta.Bck) error {
func (t *target) xstart(args *xact.ArgsMsg, bck *meta.Bck) error {
const erfmb = "global xaction %q does not require bucket (%s) - ignoring it and proceeding to start"
const erfmn = "xaction %q requires a bucket to start"

Expand All @@ -185,11 +185,9 @@ func (t *target) xstart(r *http.Request, args *xact.ArgsMsg, bck *meta.Bck) erro
if bck != nil {
nlog.Errorf(erfmb, args.Kind, bck)
}
q := r.URL.Query()
force := cos.IsParseBool(q.Get(apc.QparamForce)) // NOTE: the only 'force' use case so far
wg := &sync.WaitGroup{}
wg.Add(1)
go t.runLRU(args.ID, wg, force, args.Buckets...)
go t.runLRU(args.ID, wg, args.Force, args.Buckets...)
wg.Wait()
case apc.ActStoreCleanup:
wg := &sync.WaitGroup{}
Expand Down
5 changes: 3 additions & 2 deletions api/apc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ const (
// - implies remote backend
QparamLatestVer = "latest-ver"

QparamSync = "synchronize"
QparamSync = "synchronize" // TODO: in progress

QparamSilent = "sln" // when true., skip nlog.Error* (motivation: can be quite numerous and/or ignorable)
)

// QparamFltPresence enum.
Expand Down Expand Up @@ -142,7 +144,6 @@ const (
QparamNonElectable = "nel" // true: proxy is non-electable for the primary role
QparamUnixTime = "utm" // Unix time since 01/01/70 UTC (nanoseconds)
QparamIsGFNRequest = "gfn" // true if the request is a Get-From-Neighbor
QparamSilent = "sln" // true: skip nlog.Error* (motivation: can be quite, multiple, and/or ignorable)
QparamRebStatus = "rbs" // true: get detailed rebalancing status
QparamRebData = "rbd" // true: get EC rebalance data (pulling data if push way fails)
QparamClusterInfo = "cii" // true: /Health to return cluster info and status
Expand Down
4 changes: 2 additions & 2 deletions api/xaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func GetOneXactionStatus(bp BaseParams, args *xact.ArgsMsg) (status *nl.Status,
}

// same as above, except that it returns _all_ matching xactions
func GetAllXactionStatus(bp BaseParams, args *xact.ArgsMsg, force bool) (matching nl.StatusVec, err error) {
func GetAllXactionStatus(bp BaseParams, args *xact.ArgsMsg) (matching nl.StatusVec, err error) {
q := url.Values{apc.QparamWhat: []string{apc.WhatAllXactStatus}}
if force {
if args.Force {
// (force just-in-time)
// for each args-selected xaction:
// check if any of the targets delayed updating the corresponding status,
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

// direct
require (
github.com/NVIDIA/aistore v1.3.22-0.20240104180245-de76ab8d9b18
github.com/NVIDIA/aistore v1.3.22-0.20240104181831-65ce7ead8a6e
github.com/fatih/color v1.16.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.22-0.20240104180245-de76ab8d9b18 h1:yCrDHw5bRa95fOfqgcQ+262nJCLGvACZ9oJ6ksxZaEo=
github.com/NVIDIA/aistore v1.3.22-0.20240104180245-de76ab8d9b18/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/NVIDIA/aistore v1.3.22-0.20240104181831-65ce7ead8a6e h1:QsvQXafaoDpdMbIbVH0w1+N6UQWf8dzN//iAie8I6o0=
github.com/NVIDIA/aistore v1.3.22-0.20240104181831-65ce7ead8a6e/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
3 changes: 2 additions & 1 deletion core/ldp.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ func (lom *LOM) CheckRemoteMD(rlocked bool) (bool, int, error) {
errDel := lom.Remove(rlocked /*force through rlock*/)
if errDel != nil {
errCode, err = 0, errDel
} else {
g.tstats.Inc(RemoteDeletedDelCount)
}
g.tstats.Inc(RemoteDeletedDelCount)
return false, errCode, err
}

Expand Down

0 comments on commit ab4cccc

Please sign in to comment.