diff --git a/ais/backend/aws.go b/ais/backend/aws.go index a054703380b..f3655e565f5 100644 --- a/ais/backend/aws.go +++ b/ais/backend/aws.go @@ -622,7 +622,7 @@ func awsErrorToAISError(awsError error, bck *cmn.Bck) (int, error) { case s3.ErrCodeNoSuchBucket: return status, cmn.NewErrRemoteBckNotFound(bck) case s3.ErrCodeNoSuchKey: - debug.Assert(status == http.StatusNotFound) // expected + debug.Assert(status == http.StatusNotFound, status) // expected fallthrough default: return status, _awsErr(awsError) diff --git a/ais/test/downloader_test.go b/ais/test/downloader_test.go index 06b6dcb0ca6..7d875be07f6 100644 --- a/ais/test/downloader_test.go +++ b/ais/test/downloader_test.go @@ -549,7 +549,7 @@ func TestDownloadRemote(t *testing.T) { } tlog.Logf("(1) evicting a _list_ of objects from remote bucket %s...\n", test.srcBck) - xid, err := api.EvictList(baseParams, test.srcBck, expectedObjs) + xid, err := api.EvictMultiObj(baseParams, test.srcBck, expectedObjs, "" /*template*/) tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -581,7 +581,7 @@ func TestDownloadRemote(t *testing.T) { // Test cancellation tlog.Logf("(2) evicting a _list_ of objects from remote bucket %s...\n", test.srcBck) - xid, err = api.EvictList(baseParams, test.srcBck, expectedObjs) + xid, err = api.EvictMultiObj(baseParams, test.srcBck, expectedObjs, "" /*template*/) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) diff --git a/ais/test/integration_test.go b/ais/test/integration_test.go index 3b6bf4ea520..2e6a5ee6329 100644 --- a/ais/test/integration_test.go +++ b/ais/test/integration_test.go @@ -1369,7 +1369,7 @@ func TestAtimePrefetch(t *testing.T) { for obj := range nameCh { objs = append(objs, obj) } - xid, err := api.EvictList(baseParams, bck, objs) + xid, err := api.EvictMultiObj(baseParams, bck, objs, "" /*template*/) tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -1377,11 +1377,15 @@ func TestAtimePrefetch(t *testing.T) { timeAfterPut := time.Now() - xid, err = api.PrefetchList(baseParams, bck, objs) - tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, &args) - tassert.CheckFatal(t, err) + { + var msg apc.PrefetchMsg + msg.ObjNames = objs + xid, err = api.Prefetch(baseParams, bck, msg) + tassert.CheckFatal(t, err) + args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} + _, err = api.WaitForXactionIC(baseParams, &args) + tassert.CheckFatal(t, err) + } timeFormat := time.RFC3339Nano msg := &apc.LsoMsg{Props: apc.GetPropsAtime, TimeFormat: timeFormat, Prefix: objPath} diff --git a/ais/test/object_test.go b/ais/test/object_test.go index 8a886d8f24d..7c9852acd84 100644 --- a/ais/test/object_test.go +++ b/ais/test/object_test.go @@ -356,32 +356,40 @@ func TestSameBucketName(t *testing.T) { } tlog.Logf("PrefetchList num=%d\n", len(files)) - prefetchListID, err := api.PrefetchList(baseParams, bckRemote, files) - tassert.CheckFatal(t, err) - args := xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, &args) - tassert.CheckFatal(t, err) + { + var msg apc.PrefetchMsg + msg.ObjNames = files + prefetchListID, err := api.Prefetch(baseParams, bckRemote, msg) + tassert.CheckFatal(t, err) + args := xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} + _, err = api.WaitForXactionIC(baseParams, &args) + tassert.CheckFatal(t, err) + } tlog.Logf("PrefetchRange %s\n", objRange) - prefetchRangeID, err := api.PrefetchRange(baseParams, bckRemote, objRange) - tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: prefetchRangeID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, &args) - tassert.CheckFatal(t, err) + { + var msg apc.PrefetchMsg + msg.Template = objRange + prefetchRangeID, err := api.Prefetch(baseParams, bckRemote, msg) + tassert.CheckFatal(t, err) + args := xact.ArgsMsg{ID: prefetchRangeID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} + _, err = api.WaitForXactionIC(baseParams, &args) + tassert.CheckFatal(t, err) + } // delete one obj from remote, and check evictions (below) err = api.DeleteObject(baseParams, bckRemote, fileName1) tassert.CheckFatal(t, err) tlog.Logf("EvictList %v\n", files) - evictListID, err := api.EvictList(baseParams, bckRemote, files) + evictListID, err := api.EvictMultiObj(baseParams, bckRemote, files, "" /*template*/) tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} + args := xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) tassert.Errorf(t, err != nil, "list iterator must produce not-found when not finding listed objects") tlog.Logf("EvictRange\n") - evictRangeID, err := api.EvictRange(baseParams, bckRemote, objRange) + evictRangeID, err := api.EvictMultiObj(baseParams, bckRemote, nil /*lst objnames*/, objRange) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: evictRangeID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -411,13 +419,17 @@ func TestSameBucketName(t *testing.T) { tassert.CheckFatal(t, err) // Prefetch/Evict should work - prefetchListID, err = api.PrefetchList(baseParams, bckRemote, files) - tassert.CheckFatal(t, err) - args = xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, &args) - tassert.CheckFatal(t, err) + { + var msg apc.PrefetchMsg + msg.ObjNames = files + prefetchListID, err := api.Prefetch(baseParams, bckRemote, msg) + tassert.CheckFatal(t, err) + args = xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} + _, err = api.WaitForXactionIC(baseParams, &args) + tassert.CheckFatal(t, err) + } - evictListID, err = api.EvictList(baseParams, bckRemote, files) + evictListID, err = api.EvictMultiObj(baseParams, bckRemote, files, "" /*template*/) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -425,7 +437,7 @@ func TestSameBucketName(t *testing.T) { // Delete from cloud bucket tlog.Logf("Deleting %s and %s from cloud bucket ...\n", fileName1, fileName2) - deleteID, err := api.DeleteList(baseParams, bckRemote, files) + deleteID, err := api.DeleteMultiObj(baseParams, bckRemote, files, "" /*template*/) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -433,7 +445,7 @@ func TestSameBucketName(t *testing.T) { // Delete from ais bucket tlog.Logf("Deleting %s and %s from ais bucket ...\n", fileName1, fileName2) - deleteID, err = api.DeleteList(baseParams, bckLocal, files) + deleteID, err = api.DeleteMultiObj(baseParams, bckLocal, files, "" /*template*/) tassert.CheckFatal(t, err) args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -1548,11 +1560,11 @@ func TestOperationsWithRanges(t *testing.T) { msg = &apc.LsoMsg{Prefix: "test/"} ) if evict { - xid, err = api.EvictRange(baseParams, b, test.rangeStr) + xid, err = api.EvictMultiObj(baseParams, b, nil /*lst objnames*/, test.rangeStr) msg.Flags = apc.LsObjCached kind = apc.ActEvictObjects } else { - xid, err = api.DeleteRange(baseParams, b, test.rangeStr) + xid, err = api.DeleteMultiObj(baseParams, b, nil /*lst objnames*/, test.rangeStr) kind = apc.ActDeleteObjects } if err != nil { diff --git a/ais/test/objprops_test.go b/ais/test/objprops_test.go index 0025829ed6e..47fbda96fa1 100644 --- a/ais/test/objprops_test.go +++ b/ais/test/objprops_test.go @@ -129,7 +129,7 @@ func propsEvict(t *testing.T, proxyURL string, bck cmn.Bck, objMap map[string]st } baseParams := tools.BaseAPIParams(proxyURL) - xid, err := api.EvictList(baseParams, bck, toEvictList) + xid, err := api.EvictMultiObj(baseParams, bck, toEvictList, "" /*template*/) if err != nil { t.Errorf("Failed to evict objects: %v\n", err) } diff --git a/ais/test/regression_test.go b/ais/test/regression_test.go index c93943b4359..877f54b0746 100644 --- a/ais/test/regression_test.go +++ b/ais/test/regression_test.go @@ -745,7 +745,7 @@ func TestPrefetchList(t *testing.T) { // 2. Evict those objects from the cache and prefetch them tlog.Logf("Evicting and prefetching %d objects\n", len(m.objNames)) - xid, err := api.EvictList(baseParams, bck, m.objNames) + xid, err := api.EvictMultiObj(baseParams, bck, m.objNames, "" /*template*/) if err != nil { t.Error(err) } @@ -755,9 +755,13 @@ func TestPrefetchList(t *testing.T) { tassert.CheckFatal(t, err) // 3. Prefetch evicted objects - xid, err = api.PrefetchList(baseParams, bck, m.objNames) - if err != nil { - t.Error(err) + { + var msg apc.PrefetchMsg + msg.ObjNames = m.objNames + xid, err = api.Prefetch(baseParams, bck, msg) + if err != nil { + t.Error(err) + } } args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} @@ -817,7 +821,7 @@ func TestDeleteList(t *testing.T) { tlog.Logf("PUT done.\n") // 2. Delete the objects - xid, err := api.DeleteList(baseParams, b, files) + xid, err := api.DeleteMultiObj(baseParams, b, files, "" /*template*/) tassert.CheckError(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} @@ -875,17 +879,21 @@ func TestPrefetchRange(t *testing.T) { // 3. Evict those objects from the cache, and then prefetch them rng := fmt.Sprintf("%s%s", m.prefix, prefetchRange) tlog.Logf("Evicting and prefetching %d objects (range: %s)\n", len(files), rng) - xid, err := api.EvictRange(baseParams, bck, rng) + xid, err := api.EvictMultiObj(baseParams, bck, nil /*lst objnames*/, rng) tassert.CheckError(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) tassert.CheckFatal(t, err) - xid, err = api.PrefetchRange(baseParams, bck, rng) - tassert.CheckError(t, err) - args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} - _, err = api.WaitForXactionIC(baseParams, &args) - tassert.CheckFatal(t, err) + { + var msg apc.PrefetchMsg + msg.Template = rng + xid, err = api.Prefetch(baseParams, bck, msg) + tassert.CheckError(t, err) + args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout} + _, err = api.WaitForXactionIC(baseParams, &args) + tassert.CheckFatal(t, err) + } // 4. Ensure all done xargs := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout} @@ -954,7 +962,7 @@ func TestDeleteRange(t *testing.T) { // 2. Delete the small range of objects tlog.Logf("Delete in range %s\n", smallrange) - xid, err := api.DeleteRange(baseParams, b, smallrange) + xid, err := api.DeleteMultiObj(baseParams, b, nil /*lst objnames*/, smallrange) tassert.CheckError(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -983,7 +991,7 @@ func TestDeleteRange(t *testing.T) { tlog.Logf("Delete in range %s\n", bigrange) // 4. Delete the big range of objects - xid, err = api.DeleteRange(baseParams, b, bigrange) + xid, err = api.DeleteMultiObj(baseParams, b, nil /*lst objnames*/, bigrange) tassert.CheckError(t, err) args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -1060,7 +1068,7 @@ func TestStressDeleteRange(t *testing.T) { // 2. Delete a range of objects tlog.Logf("Deleting objects in range: %s\n", partialRange) - xid, err := api.DeleteRange(baseParams, bck, partialRange) + xid, err := api.DeleteMultiObj(baseParams, bck, nil /*lst objnames*/, partialRange) tassert.CheckError(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) @@ -1092,7 +1100,7 @@ func TestStressDeleteRange(t *testing.T) { // 4. Delete the entire range of objects tlog.Logf("Deleting objects in range: %s\n", fullRange) - xid, err = api.DeleteRange(baseParams, bck, fullRange) + xid, err = api.DeleteMultiObj(baseParams, bck, nil /*lst objnames*/, fullRange) tassert.CheckError(t, err) args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout} _, err = api.WaitForXactionIC(baseParams, &args) diff --git a/ais/test/scripts/s3-prefetch-latest.sh b/ais/test/scripts/s3-prefetch-latest.sh new file mode 100755 index 00000000000..c8900e5b9f5 --- /dev/null +++ b/ais/test/scripts/s3-prefetch-latest.sh @@ -0,0 +1,98 @@ +#!/bin/bash + +## Prerequisites: ################################################################################# +# - s3 bucket +# - s3cmd, $PATH-executable and configured to access the bucket out-of-band +# - aistore cluster, also configured to access the same bucket +# +## Example usage: +## ./ais/test/scripts/s3-prefetch-latest.sh --bucket s3://abc ######################## + +lorem='Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.' + +duis='Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Et harum quidem..' + +## Command line options (and their respective defaults) +bucket="s3://abc" + +## constants +sum1="xxhash[ad97df912d23103f]" +sum2="xxhash[ecb5ed42299ea74d]" + +host="--host=s3.amazonaws.com" + +while (( "$#" )); do + case "${1}" in + --bucket) bucket=$2; shift; shift;; + *) echo "fatal: unknown argument '${1}'"; exit 1;; + esac +done + +## uncomment for verbose output +set -x ## DEBUG + +## establish existence +ais show bucket $bucket -c 1>/dev/null || exit $? + +cleanup() { + rc=$? + ais object rm "$bucket/lorem-duis" 1>/dev/null 2>&1 + exit $rc +} + +trap cleanup EXIT INT TERM + +echo -e +ais show performance counters --regex "(GET-COLD$|VERSION-CHANGE$|DELETE)" +echo -e + +echo "1. out-of-band PUT: first version" +echo $lorem | s3cmd put - "$bucket/lorem-duis" $host 1>/dev/null || exit $? + +echo "2. prefetch, and check" +ais prefetch "$bucket/lorem-duis" +sleep 5 ############################# DEBUG +checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}') +[[ "$checksum" == "$sum1" ]] || { echo "FAIL: $checksum != $sum1"; exit 1; } + +echo "3. out-of-band PUT: 2nd version (overwrite)" +echo $duis | s3cmd put - "$bucket/lorem-duis" $host 1>/dev/null || exit $? + +echo "4. prefetch and check (expecting the first version's checksum)" +ais prefetch "$bucket/lorem-duis" +sleep 5 ############################# DEBUG +checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}') +[[ "$checksum" != "$sum2" ]] || { echo "FAIL: $checksum == $sum2"; exit 1; } + +echo "5. query cold-get count (statistics)" +cnt1=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}') + +echo "6. prefetch latest: detect version change and trigger cold GET" +ais prefetch "$bucket/lorem-duis" --latest +sleep 5 ############################# DEBUG +checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}') +[[ "$checksum" == "$sum2" ]] || { echo "FAIL: $checksum != $sum2"; exit 1; } + +echo "7. cold-get counter must increment" +cnt2=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}') +[[ $cnt2 == $(($cnt1+1)) ]] || { echo "FAIL: $cnt2 != $(($cnt1+1))"; exit 1; } + +echo "8. warm GET must remain \"warm\" and cold-get-count must not increment" +ais get "$bucket/lorem-duis" /dev/null 1>/dev/null +checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}') +[[ "$checksum" == "$sum2" ]] || { echo "FAIL: $checksum != $sum2"; exit 1; } + +cnt3=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}') +[[ $cnt3 == $cnt2 ]] || { echo "FAIL: $cnt3 != $cnt2"; exit 1; } + +echo "9. out-of-band DELETE" +s3cmd del "$bucket/lorem-duis" $host 1>/dev/null || exit $? + +echo "10. warm GET must (silently) trigger deletion" +ais get "$bucket/lorem-duis" /dev/null --silent 1>/dev/null 2>&1 +[[ $? != 0 ]] || { echo "FAIL: expecting GET error, got $?"; exit 1; } +ais ls "$bucket/lorem-duis" --cached --silent -H 2>/dev/null +[[ $? != 0 ]] || { echo "FAIL: expecting 'show object' error, got $?"; exit 1; } + +echo -e +ais show performance counters --regex "(GET-COLD$|VERSION-CHANGE$|DELETE)" diff --git a/api/multiobj.go b/api/multiobj.go index 8cddc673f7e..0e59c6ad6f7 100644 --- a/api/multiobj.go +++ b/api/multiobj.go @@ -47,51 +47,23 @@ func ETLMultiObj(bp BaseParams, bckFrom cmn.Bck, msg *cmn.TCObjsMsg, fltPresence return dolr(bp, bckFrom, apc.ActETLObjects, msg, q) } -// DeleteList sends request to remove a list of objects from a bucket. -func DeleteList(bp BaseParams, bck cmn.Bck, filesList []string) (string, error) { +func DeleteMultiObj(bp BaseParams, bck cmn.Bck, objNames []string, template string) (string, error) { bp.Method = http.MethodDelete q := bck.NewQuery() - msg := apc.ListRange{ObjNames: filesList} + msg := apc.ListRange{ObjNames: objNames, Template: template} return dolr(bp, bck, apc.ActDeleteObjects, msg, q) } -// DeleteRange sends request to remove a range of objects from a bucket. -func DeleteRange(bp BaseParams, bck cmn.Bck, rng string) (string, error) { +func EvictMultiObj(bp BaseParams, bck cmn.Bck, objNames []string, template string) (string, error) { bp.Method = http.MethodDelete q := bck.NewQuery() - msg := apc.ListRange{Template: rng} - return dolr(bp, bck, apc.ActDeleteObjects, msg, q) -} - -// EvictList sends request to evict a list of objects from a remote bucket. -func EvictList(bp BaseParams, bck cmn.Bck, fileslist []string) (string, error) { - bp.Method = http.MethodDelete - q := bck.NewQuery() - msg := apc.ListRange{ObjNames: fileslist} + msg := apc.ListRange{ObjNames: objNames, Template: template} return dolr(bp, bck, apc.ActEvictObjects, msg, q) } -// EvictRange sends request to evict a range of objects from a remote bucket. -func EvictRange(bp BaseParams, bck cmn.Bck, rng string) (string, error) { - bp.Method = http.MethodDelete - q := bck.NewQuery() - msg := apc.ListRange{Template: rng} - return dolr(bp, bck, apc.ActEvictObjects, msg, q) -} - -// PrefetchList sends request to prefetch a list of objects from a remote bucket. -func PrefetchList(bp BaseParams, bck cmn.Bck, objNames []string) (string, error) { - bp.Method = http.MethodPost - q := bck.NewQuery() - msg := apc.ListRange{ObjNames: objNames} - return dolr(bp, bck, apc.ActPrefetchObjects, msg, q) -} - -// PrefetchRange sends request to prefetch a range of objects from a remote bucket. -func PrefetchRange(bp BaseParams, bck cmn.Bck, rng string) (string, error) { +func Prefetch(bp BaseParams, bck cmn.Bck, msg apc.PrefetchMsg) (string, error) { bp.Method = http.MethodPost q := bck.NewQuery() - msg := apc.ListRange{Template: rng} return dolr(bp, bck, apc.ActPrefetchObjects, msg, q) } diff --git a/api/object.go b/api/object.go index edcc142a79a..b4e216564e1 100644 --- a/api/object.go +++ b/api/object.go @@ -400,11 +400,12 @@ func EvictObject(bp BaseParams, bck cmn.Bck, objName string) error { return err } -// prefetch object -// - convenience method added also for "symmetry" with the evict (above) +// prefetch object - a convenience method added for "symmetry" with the evict (above) // - compare with api.PrefetchList and api.PrefetchRange func PrefetchObject(bp BaseParams, bck cmn.Bck, objName string) (string, error) { - return PrefetchList(bp, bck, []string{objName}) + var msg apc.PrefetchMsg + msg.ObjNames = []string{objName} + return Prefetch(bp, bck, msg) } // PutObject PUTs the specified reader (`args.Reader`) as a new object diff --git a/bench/tools/aisloader/run.go b/bench/tools/aisloader/run.go index 34d29320849..64fe59625bc 100644 --- a/bench/tools/aisloader/run.go +++ b/bench/tools/aisloader/run.go @@ -1081,7 +1081,7 @@ func cleanupObjs(objs []string, wg *sync.WaitGroup) { b := min(t, runParams.batchSize) n := t / b for i := 0; i < n; i++ { - xid, err := api.DeleteList(runParams.bp, runParams.bck, objs[i*b:(i+1)*b]) + xid, err := api.DeleteMultiObj(runParams.bp, runParams.bck, objs[i*b:(i+1)*b], "" /*template*/) if err != nil { fmt.Println("delete err ", err) } @@ -1092,7 +1092,7 @@ func cleanupObjs(objs []string, wg *sync.WaitGroup) { } if t%b != 0 { - xid, err := api.DeleteList(runParams.bp, runParams.bck, objs[n*b:]) + xid, err := api.DeleteMultiObj(runParams.bp, runParams.bck, objs[n*b:], "" /*template*/) if err != nil { fmt.Println("delete err ", err) } diff --git a/cmd/cli/cli/bucket.go b/cmd/cli/cli/bucket.go index fb8498e6d2e..d022dd6a7e3 100644 --- a/cmd/cli/cli/bucket.go +++ b/cmd/cli/cli/bucket.go @@ -145,14 +145,14 @@ func evictBucket(c *cli.Context, bck cmn.Bck) error { } func _evictBck(c *cli.Context, bck cmn.Bck) (err error) { - if err = ensureHasProvider(bck); err != nil { - return + if err = ensureRemoteProvider(bck); err != nil { + return err } if err = api.EvictRemoteBucket(apiBP, bck, flagIsSet(c, keepMDFlag)); err != nil { - return + return V(err) } actionDone(c, "Evicted bucket "+bck.Cname("")+" from aistore") - return + return nil } func listOrSummBuckets(c *cli.Context, qbck cmn.QueryBcks, lsb lsbCtx) error { diff --git a/cmd/cli/cli/const.go b/cmd/cli/cli/const.go index e17744189d1..ffdc20f0d72 100644 --- a/cmd/cli/cli/const.go +++ b/cmd/cli/cli/const.go @@ -473,9 +473,9 @@ var ( latestVersionFlag = cli.BoolFlag{ Name: "latest", - Usage: "GET, prefetch, or copy the latest object version from the associated remote bucket:\n" + - indent1 + "\t- allows fine-grained (operation-level) control without changing bucket configuration\n" + - indent1 + "\t- see also: 'ais bucket props ... versioning.validate_warm_get'", + Usage: "GET, prefetch, or copy the latest object version from the associated remote bucket;\n" + + indent1 + "\tallows operation-level control without changing bucket configuration\n" + + indent1 + "\t(see also: 'ais bucket props ... versioning.validate_warm_get')", } averageSizeFlag = cli.BoolFlag{Name: "average-size", Usage: "show average GET, PUT, etc. request size"} diff --git a/cmd/cli/cli/multiobj.go b/cmd/cli/cli/multiobj.go index ed50fd63190..a1f24c41407 100644 --- a/cmd/cli/cli/multiobj.go +++ b/cmd/cli/cli/multiobj.go @@ -23,6 +23,11 @@ import ( const dryRunExamplesCnt = 10 +type lrCtx struct { + listObjs, tmplObjs string + bck cmn.Bck +} + // x-TCO: multi-object transform or copy func runTCO(c *cli.Context, bckFrom, bckTo cmn.Bck, listObjs, tmplObjs, etlName string) error { var ( @@ -118,169 +123,6 @@ func runTCO(c *cli.Context, bckFrom, bckTo cmn.Bck, listObjs, tmplObjs, etlName return err } -// evict, rm, prefetch -func listrange(c *cli.Context, bck cmn.Bck, listObjs, tmplObjs string) (err error) { - var ( - xid, xname string - text string - num int64 - ) - if listObjs != "" { - xid, xname, text, num, err = _listOp(c, bck, listObjs) - } else { - xid, xname, text, num, err = _rangeOp(c, bck, tmplObjs) - } - if err != nil { - return - } - - // progress bar - showProgress := flagIsSet(c, progressFlag) - if showProgress { - var cpr = cprCtx{ - xname: xname, - xid: xid, - from: bck.Cname(""), - loghdr: text, - } - cpr.totals.objs = num - return cpr.multiobj(c, text) - } - - // otherwise, wait or exit - if !flagIsSet(c, waitFlag) && !flagIsSet(c, waitJobXactFinishedFlag) { - if xid != "" { - text += ". " + toMonitorMsg(c, xid, "") - } - fmt.Fprintln(c.App.Writer, text) - return - } - - // wait - var timeout time.Duration - if flagIsSet(c, waitJobXactFinishedFlag) { - timeout = parseDurationFlag(c, waitJobXactFinishedFlag) - } - fmt.Fprintln(c.App.Writer, text+" ...") - xargs := xact.ArgsMsg{ID: xid, Kind: xname, Timeout: timeout} - if err := waitXact(apiBP, &xargs); err != nil { - return err - } - fmt.Fprint(c.App.Writer, fmtXactSucceeded) - return -} - -// `--list` flag -func _listOp(c *cli.Context, bck cmn.Bck, arg string) (xid, xname, text string, num int64, err error) { - var ( - kind string - fileList = splitCsv(arg) - ) - if flagIsSet(c, dryRunFlag) { - limitedLineWriter(c.App.Writer, - dryRunExamplesCnt, strings.ToUpper(c.Command.Name)+" "+bck.Cname("")+"/%s\n", fileList) - return - } - var action string - switch c.Command.Name { - case commandRemove: - xid, err = api.DeleteList(apiBP, bck, fileList) - kind = apc.ActDeleteObjects - action = "rm" - case commandPrefetch: - if err = ensureHasProvider(bck); err != nil { - return - } - xid, err = api.PrefetchList(apiBP, bck, fileList) - kind = apc.ActPrefetchObjects - action = "prefetch" - case commandEvict: - if err = ensureHasProvider(bck); err != nil { - return - } - xid, err = api.EvictList(apiBP, bck, fileList) - kind = apc.ActEvictObjects - action = "evict" - default: - debug.Assert(false, c.Command.Name) - return - } - if err != nil { - return - } - num = int64(len(fileList)) - s := fmt.Sprintf("%v", fileList) - if num > 4 { - s = fmt.Sprintf("%v...", fileList[:4]) - } - _, xname = xact.GetKindName(kind) - text = fmt.Sprintf("%s[%s]: %s %s from %s", xname, xid, s, action, bck.Cname("")) - return -} - -// `--range` flag -func _rangeOp(c *cli.Context, bck cmn.Bck, rangeStr string) (xid, xname, text string, num int64, err error) { - var ( - kind string - pt cos.ParsedTemplate - emptyTemplate bool - ) - pt, err = cos.NewParsedTemplate(rangeStr) // NOTE: prefix w/ no range is fine - if err != nil { - if err != cos.ErrEmptyTemplate { - fmt.Fprintf(c.App.Writer, "invalid template %q: %v\n", rangeStr, err) - return - } - err, emptyTemplate = nil, true - } - // [DRY-RUN] - if flagIsSet(c, dryRunFlag) { - objs := pt.ToSlice(dryRunExamplesCnt) - limitedLineWriter(c.App.Writer, - dryRunExamplesCnt, strings.ToUpper(c.Command.Name)+" "+bck.Cname("")+"/%s", objs) - if pt.Count() > dryRunExamplesCnt { - fmt.Fprintf(c.App.Writer, "(and %d more)\n", pt.Count()-dryRunExamplesCnt) - } - return - } - - var action string - switch c.Command.Name { - case commandRemove: - xid, err = api.DeleteRange(apiBP, bck, rangeStr) - kind = apc.ActDeleteObjects - action = "rm" - case commandPrefetch: - if err = ensureHasProvider(bck); err != nil { - return - } - xid, err = api.PrefetchRange(apiBP, bck, rangeStr) - kind = apc.ActPrefetchObjects - action = "prefetch" - case commandEvict: - if err = ensureHasProvider(bck); err != nil { - return - } - xid, err = api.EvictRange(apiBP, bck, rangeStr) - kind = apc.ActEvictObjects - action = "evict" - default: - debug.Assert(false, c.Command.Name) - return - } - if err != nil { - return - } - num = pt.Count() - _, xname = xact.GetKindName(kind) - if emptyTemplate { - text = fmt.Sprintf("%s[%s]: %s entire bucket %s", xname, xid, action, bck.Cname("")) - } else { - text = fmt.Sprintf("%s[%s]: %s %q from %s", xname, xid, action, rangeStr, bck.Cname("")) - } - return xid, xname, text, num, nil -} - // // evict, rm, prefetch ------------------------------------------------------------------------ // @@ -324,7 +166,8 @@ func _evictOne(c *cli.Context, shift int) error { switch { case listObjs != "" || tmplObjs != "": // 1. multi-obj - return listrange(c, bck, listObjs, tmplObjs) + lrCtx := &lrCtx{listObjs, tmplObjs, bck} + return lrCtx.do(c) case objName == "": // 2. entire bucket return evictBucket(c, bck) default: // 3. one(?) obj to evict @@ -380,7 +223,8 @@ func _rmOne(c *cli.Context, shift int) error { switch { case listObjs != "" || tmplObjs != "": // 1. multi-obj - return listrange(c, bck, listObjs, tmplObjs) + lrCtx := &lrCtx{listObjs, tmplObjs, bck} + return lrCtx.do(c) case objName == "": // 2. all objects if flagIsSet(c, rmrfFlag) { if !flagIsSet(c, yesFlag) { @@ -450,5 +294,149 @@ func _prefetchOne(c *cli.Context, shift int) error { if listObjs == "" && tmplObjs == "" { listObjs = objName } - return listrange(c, bck, listObjs, tmplObjs) // NOTE: empty tmplObjs means "all objects" + lrCtx := &lrCtx{listObjs, tmplObjs, bck} + return lrCtx.do(c) +} + +// +// lrCtx: evict, rm, prefetch +// + +func (lr *lrCtx) do(c *cli.Context) (err error) { + var ( + fileList []string + kind string + pt cos.ParsedTemplate + emptyTemplate bool + ) + // 1. parse + if lr.listObjs != "" { + fileList = splitCsv(lr.listObjs) + } else { + pt, err = cos.NewParsedTemplate(lr.tmplObjs) // NOTE: prefix w/ no range is fine + if err != nil { + if err != cos.ErrEmptyTemplate { + fmt.Fprintf(c.App.Writer, "invalid template %q: %v\n", lr.tmplObjs, err) + return + } + err, emptyTemplate = nil, true // NOTE: empty tmplObjs means "all objects" + } + } + + // 2. [DRY-RUN] + if flagIsSet(c, dryRunFlag) { + lr.dry(c, fileList, &pt) + return + } + + // 3. do + xid, kind, action, errV := lr._do(c, fileList) + if err != nil { + return V(errV) + } + + // 4. format + var ( + xname, text string + num int64 + ) + if lr.listObjs != "" { + num = int64(len(fileList)) + s := fmt.Sprintf("%v", fileList) + if num > 4 { + s = fmt.Sprintf("%v...", fileList[:4]) + } + _, xname = xact.GetKindName(kind) + text = fmt.Sprintf("%s[%s]: %s %s from %s", xname, xid, s, action, lr.bck.Cname("")) + } else { + num = pt.Count() + _, xname = xact.GetKindName(kind) + if emptyTemplate { + text = fmt.Sprintf("%s[%s]: %s entire bucket %s", xname, xid, action, lr.bck.Cname("")) + } else { + text = fmt.Sprintf("%s[%s]: %s %q from %s", xname, xid, action, lr.tmplObjs, lr.bck.Cname("")) + } + } + + // 5. progress + showProgress := flagIsSet(c, progressFlag) + if showProgress { + var cpr = cprCtx{ + xname: xname, + xid: xid, + from: lr.bck.Cname(""), + loghdr: text, + } + cpr.totals.objs = num + return cpr.multiobj(c, text) + } + + // 6. otherwise, wait or exit + if !flagIsSet(c, waitFlag) && !flagIsSet(c, waitJobXactFinishedFlag) { + if xid != "" { + text += ". " + toMonitorMsg(c, xid, "") + } + fmt.Fprintln(c.App.Writer, text) + return nil + } + + // wait + var timeout time.Duration + if flagIsSet(c, waitJobXactFinishedFlag) { + timeout = parseDurationFlag(c, waitJobXactFinishedFlag) + } + fmt.Fprintln(c.App.Writer, text+" ...") + xargs := xact.ArgsMsg{ID: xid, Kind: xname, Timeout: timeout} + if err := waitXact(apiBP, &xargs); err != nil { + return err + } + fmt.Fprint(c.App.Writer, fmtXactSucceeded) + return nil +} + +// [DRY-RUN] +func (lr *lrCtx) dry(c *cli.Context, fileList []string, pt *cos.ParsedTemplate) { + if len(fileList) > 0 { + limitedLineWriter(c.App.Writer, + dryRunExamplesCnt, strings.ToUpper(c.Command.Name)+" "+lr.bck.Cname("")+"/%s\n", fileList) + return + } + objs := pt.ToSlice(dryRunExamplesCnt) + limitedLineWriter(c.App.Writer, + dryRunExamplesCnt, strings.ToUpper(c.Command.Name)+" "+lr.bck.Cname("")+"/%s", objs) + if pt.Count() > dryRunExamplesCnt { + fmt.Fprintf(c.App.Writer, "(and %d more)\n", pt.Count()-dryRunExamplesCnt) + } +} + +func (lr *lrCtx) _do(c *cli.Context, fileList []string) (xid, kind, action string, err error) { + switch c.Command.Name { + case commandRemove: + xid, err = api.DeleteMultiObj(apiBP, lr.bck, fileList, lr.tmplObjs) + kind = apc.ActDeleteObjects + action = "rm" + case commandPrefetch: + if err = ensureRemoteProvider(lr.bck); err != nil { + return + } + var msg apc.PrefetchMsg + { + msg.ObjNames = fileList + msg.Template = lr.tmplObjs + msg.LatestVer = flagIsSet(c, latestVersionFlag) + } + xid, err = api.Prefetch(apiBP, lr.bck, msg) + kind = apc.ActPrefetchObjects + action = "prefetch" + case commandEvict: + if err = ensureRemoteProvider(lr.bck); err != nil { + return + } + xid, err = api.EvictMultiObj(apiBP, lr.bck, fileList, lr.tmplObjs) + kind = apc.ActEvictObjects + action = "evict" + default: + debug.Assert(false, c.Command.Name) + } + return xid, kind, action, err } diff --git a/cmd/cli/cli/object_hdlr.go b/cmd/cli/cli/object_hdlr.go index da68a1a1ee1..d236d9e9f24 100644 --- a/cmd/cli/cli/object_hdlr.go +++ b/cmd/cli/cli/object_hdlr.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/NVIDIA/aistore/api" "github.com/NVIDIA/aistore/cmd/cli/teb" @@ -286,6 +287,7 @@ func putHandler(c *cli.Context) error { if flagIsSet(c, dryRunFlag) { dryRunCptn(c) } + // 1. one file if a.srcIsRegular() { debug.Assert(a.src.abspath != "") @@ -303,12 +305,10 @@ func putHandler(c *cli.Context) error { incl := flagIsSet(c, inclSrcDirNameFlag) switch { case len(a.src.fdnames) > 0: - var s string if len(a.src.fdnames) > 1 { - s = " ..." - } - if ok := warnMultiSrcDstPrefix(c, &a, fmt.Sprintf("from [%s%s]", a.src.fdnames[0], s)); !ok { - return nil + if ok := warnMultiSrcDstPrefix(c, &a, fmt.Sprintf("from [%s ...]", a.src.fdnames[0])); !ok { + return nil + } } // a) csv of files and/or directories (names) embedded into the first arg, e.g. "f1[,f2...]" dst-bucket[/prefix] // b) csv from '--list' flag @@ -333,6 +333,7 @@ func putHandler(c *cli.Context) error { // 4. directory var ( + s string ndir int srcpath = a.src.arg ) @@ -340,7 +341,10 @@ func putHandler(c *cli.Context) error { debug.Assert(srcpath == "", srcpath) srcpath = a.pt.Prefix } - if ok := warnMultiSrcDstPrefix(c, &a, fmt.Sprintf("from '%s' directory", srcpath)); !ok { + if !strings.HasSuffix(srcpath, "/") { + s = "/" + } + if ok := warnMultiSrcDstPrefix(c, &a, fmt.Sprintf("from '%s%s'", srcpath, s)); !ok { return nil } fobjs, err := lsFobj(c, srcpath, "", a.dst.oname, &ndir, a.src.recurs, incl) diff --git a/cmd/cli/cli/utils.go b/cmd/cli/cli/utils.go index 84df6c1f6cb..28bbf2af249 100644 --- a/cmd/cli/cli/utils.go +++ b/cmd/cli/cli/utils.go @@ -467,8 +467,8 @@ func headBucket(bck cmn.Bck, dontAddBckMD bool) (p *cmn.Bprops, err error) { // Prints multiple lines of fmtStr to writer w. // For line number i, fmtStr is formatted with values of args at index i -// if maxLines >= 0 prints at most maxLines, otherwise prints everything until -// it reaches the end of one of args +// - if maxLines >= 0 prints at most maxLines +// - otherwise, prints everything until the end of one of the args func limitedLineWriter(w io.Writer, maxLines int, fmtStr string, args ...[]string) { objs := make([]any, 0, len(args)) if fmtStr == "" || fmtStr[len(fmtStr)-1] != '\n' { @@ -613,9 +613,12 @@ func isBucketEmpty(bck cmn.Bck, cached bool) (bool, error) { return len(objList.Entries) == 0, nil } -func ensureHasProvider(bck cmn.Bck) error { +func ensureRemoteProvider(bck cmn.Bck) error { if !apc.IsProvider(bck.Provider) { - return fmt.Errorf("missing backend provider in %q", bck) + return fmt.Errorf("invalid bucket %q: missing backend provider", bck) + } + if !bck.IsRemote() { + return fmt.Errorf("invalid bucket %q: expecting remote backend", bck) } return nil } diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index da9128f76e4..7e786d3fda4 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -4,7 +4,7 @@ go 1.21 // direct require ( - github.com/NVIDIA/aistore v1.3.22-0.20231230192204-539c3858a566 + github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e github.com/fatih/color v1.16.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo v1.16.5 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index b355c9b73d2..2b5caf7de0b 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/NVIDIA/aistore v1.3.22-0.20231230192204-539c3858a566 h1:bBDepGFxoekkGIOTsMVR554StQpD2f8ROLuoLRP1VRA= -github.com/NVIDIA/aistore v1.3.22-0.20231230192204-539c3858a566/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I= +github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e h1:6PgUhYxH7iDCQR97KAnvw1l0TkMvTkA3qdBQ/i72j5Y= +github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/tools/client.go b/tools/client.go index ae14eeb229a..c6cc975f3b3 100644 --- a/tools/client.go +++ b/tools/client.go @@ -234,7 +234,7 @@ func CleanupRemoteBucket(t *testing.T, proxyURL string, bck cmn.Bck, prefix stri } bp := BaseAPIParams(proxyURL) - xid, err := api.DeleteList(bp, bck, toDelete) + xid, err := api.DeleteMultiObj(bp, bck, toDelete, "" /*template*/) tassert.CheckFatal(t, err) args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: BucketCleanupTimeout} _, err = api.WaitForXactionIC(bp, &args) @@ -466,7 +466,7 @@ func BaseAPIParams(urls ...string) api.BaseParams { func EvictObjects(t *testing.T, proxyURL string, bck cmn.Bck, objList []string) { bp := BaseAPIParams(proxyURL) - xid, err := api.EvictList(bp, bck, objList) + xid, err := api.EvictMultiObj(bp, bck, objList, "" /*template*/) if err != nil { t.Errorf("Evict bucket %s failed, err = %v", bck, err) }