Skip to content

Commit

Permalink
[API change] prefetch v2; multi-object operations
Browse files Browse the repository at this point in the history
* prev. commit: 7a525d7
* part two

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 2, 2024
1 parent 67361c0 commit 3699b60
Show file tree
Hide file tree
Showing 18 changed files with 364 additions and 274 deletions.
2 changes: 1 addition & 1 deletion ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func awsErrorToAISError(awsError error, bck *cmn.Bck) (int, error) {
case s3.ErrCodeNoSuchBucket:
return status, cmn.NewErrRemoteBckNotFound(bck)
case s3.ErrCodeNoSuchKey:
debug.Assert(status == http.StatusNotFound) // expected
debug.Assert(status == http.StatusNotFound, status) // expected
fallthrough
default:
return status, _awsErr(awsError)
Expand Down
4 changes: 2 additions & 2 deletions ais/test/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func TestDownloadRemote(t *testing.T) {
}

tlog.Logf("(1) evicting a _list_ of objects from remote bucket %s...\n", test.srcBck)
xid, err := api.EvictList(baseParams, test.srcBck, expectedObjs)
xid, err := api.EvictMultiObj(baseParams, test.srcBck, expectedObjs, "" /*template*/)
tassert.CheckFatal(t, err)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
Expand Down Expand Up @@ -581,7 +581,7 @@ func TestDownloadRemote(t *testing.T) {

// Test cancellation
tlog.Logf("(2) evicting a _list_ of objects from remote bucket %s...\n", test.srcBck)
xid, err = api.EvictList(baseParams, test.srcBck, expectedObjs)
xid, err = api.EvictMultiObj(baseParams, test.srcBck, expectedObjs, "" /*template*/)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
Expand Down
16 changes: 10 additions & 6 deletions ais/test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,19 +1369,23 @@ func TestAtimePrefetch(t *testing.T) {
for obj := range nameCh {
objs = append(objs, obj)
}
xid, err := api.EvictList(baseParams, bck, objs)
xid, err := api.EvictMultiObj(baseParams, bck, objs, "" /*template*/)
tassert.CheckFatal(t, err)
args := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

timeAfterPut := time.Now()

xid, err = api.PrefetchList(baseParams, bck, objs)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
{
var msg apc.PrefetchMsg
msg.ObjNames = objs
xid, err = api.Prefetch(baseParams, bck, msg)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
}

timeFormat := time.RFC3339Nano
msg := &apc.LsoMsg{Props: apc.GetPropsAtime, TimeFormat: timeFormat, Prefix: objPath}
Expand Down
58 changes: 35 additions & 23 deletions ais/test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,32 +356,40 @@ func TestSameBucketName(t *testing.T) {
}

tlog.Logf("PrefetchList num=%d\n", len(files))
prefetchListID, err := api.PrefetchList(baseParams, bckRemote, files)
tassert.CheckFatal(t, err)
args := xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
{
var msg apc.PrefetchMsg
msg.ObjNames = files
prefetchListID, err := api.Prefetch(baseParams, bckRemote, msg)
tassert.CheckFatal(t, err)
args := xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
}

tlog.Logf("PrefetchRange %s\n", objRange)
prefetchRangeID, err := api.PrefetchRange(baseParams, bckRemote, objRange)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: prefetchRangeID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
{
var msg apc.PrefetchMsg
msg.Template = objRange
prefetchRangeID, err := api.Prefetch(baseParams, bckRemote, msg)
tassert.CheckFatal(t, err)
args := xact.ArgsMsg{ID: prefetchRangeID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
}

// delete one obj from remote, and check evictions (below)
err = api.DeleteObject(baseParams, bckRemote, fileName1)
tassert.CheckFatal(t, err)

tlog.Logf("EvictList %v\n", files)
evictListID, err := api.EvictList(baseParams, bckRemote, files)
evictListID, err := api.EvictMultiObj(baseParams, bckRemote, files, "" /*template*/)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
args := xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.Errorf(t, err != nil, "list iterator must produce not-found when not finding listed objects")

tlog.Logf("EvictRange\n")
evictRangeID, err := api.EvictRange(baseParams, bckRemote, objRange)
evictRangeID, err := api.EvictMultiObj(baseParams, bckRemote, nil /*lst objnames*/, objRange)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: evictRangeID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
Expand Down Expand Up @@ -411,29 +419,33 @@ func TestSameBucketName(t *testing.T) {
tassert.CheckFatal(t, err)

// Prefetch/Evict should work
prefetchListID, err = api.PrefetchList(baseParams, bckRemote, files)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
{
var msg apc.PrefetchMsg
msg.ObjNames = files
prefetchListID, err := api.Prefetch(baseParams, bckRemote, msg)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: prefetchListID, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
}

evictListID, err = api.EvictList(baseParams, bckRemote, files)
evictListID, err = api.EvictMultiObj(baseParams, bckRemote, files, "" /*template*/)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: evictListID, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

// Delete from cloud bucket
tlog.Logf("Deleting %s and %s from cloud bucket ...\n", fileName1, fileName2)
deleteID, err := api.DeleteList(baseParams, bckRemote, files)
deleteID, err := api.DeleteMultiObj(baseParams, bckRemote, files, "" /*template*/)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

// Delete from ais bucket
tlog.Logf("Deleting %s and %s from ais bucket ...\n", fileName1, fileName2)
deleteID, err = api.DeleteList(baseParams, bckLocal, files)
deleteID, err = api.DeleteMultiObj(baseParams, bckLocal, files, "" /*template*/)
tassert.CheckFatal(t, err)
args = xact.ArgsMsg{ID: deleteID, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
Expand Down Expand Up @@ -1548,11 +1560,11 @@ func TestOperationsWithRanges(t *testing.T) {
msg = &apc.LsoMsg{Prefix: "test/"}
)
if evict {
xid, err = api.EvictRange(baseParams, b, test.rangeStr)
xid, err = api.EvictMultiObj(baseParams, b, nil /*lst objnames*/, test.rangeStr)
msg.Flags = apc.LsObjCached
kind = apc.ActEvictObjects
} else {
xid, err = api.DeleteRange(baseParams, b, test.rangeStr)
xid, err = api.DeleteMultiObj(baseParams, b, nil /*lst objnames*/, test.rangeStr)
kind = apc.ActDeleteObjects
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ais/test/objprops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func propsEvict(t *testing.T, proxyURL string, bck cmn.Bck, objMap map[string]st
}

baseParams := tools.BaseAPIParams(proxyURL)
xid, err := api.EvictList(baseParams, bck, toEvictList)
xid, err := api.EvictMultiObj(baseParams, bck, toEvictList, "" /*template*/)
if err != nil {
t.Errorf("Failed to evict objects: %v\n", err)
}
Expand Down
38 changes: 23 additions & 15 deletions ais/test/regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ func TestPrefetchList(t *testing.T) {

// 2. Evict those objects from the cache and prefetch them
tlog.Logf("Evicting and prefetching %d objects\n", len(m.objNames))
xid, err := api.EvictList(baseParams, bck, m.objNames)
xid, err := api.EvictMultiObj(baseParams, bck, m.objNames, "" /*template*/)
if err != nil {
t.Error(err)
}
Expand All @@ -755,9 +755,13 @@ func TestPrefetchList(t *testing.T) {
tassert.CheckFatal(t, err)

// 3. Prefetch evicted objects
xid, err = api.PrefetchList(baseParams, bck, m.objNames)
if err != nil {
t.Error(err)
{
var msg apc.PrefetchMsg
msg.ObjNames = m.objNames
xid, err = api.Prefetch(baseParams, bck, msg)
if err != nil {
t.Error(err)
}
}

args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
Expand Down Expand Up @@ -817,7 +821,7 @@ func TestDeleteList(t *testing.T) {
tlog.Logf("PUT done.\n")

// 2. Delete the objects
xid, err := api.DeleteList(baseParams, b, files)
xid, err := api.DeleteMultiObj(baseParams, b, files, "" /*template*/)
tassert.CheckError(t, err)

args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout}
Expand Down Expand Up @@ -875,17 +879,21 @@ func TestPrefetchRange(t *testing.T) {
// 3. Evict those objects from the cache, and then prefetch them
rng := fmt.Sprintf("%s%s", m.prefix, prefetchRange)
tlog.Logf("Evicting and prefetching %d objects (range: %s)\n", len(files), rng)
xid, err := api.EvictRange(baseParams, bck, rng)
xid, err := api.EvictMultiObj(baseParams, bck, nil /*lst objnames*/, rng)
tassert.CheckError(t, err)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActEvictObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)

xid, err = api.PrefetchRange(baseParams, bck, rng)
tassert.CheckError(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
{
var msg apc.PrefetchMsg
msg.Template = rng
xid, err = api.Prefetch(baseParams, bck, msg)
tassert.CheckError(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActPrefetchObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
tassert.CheckFatal(t, err)
}

// 4. Ensure all done
xargs := xact.ArgsMsg{ID: xid, Timeout: tools.RebalanceTimeout}
Expand Down Expand Up @@ -954,7 +962,7 @@ func TestDeleteRange(t *testing.T) {

// 2. Delete the small range of objects
tlog.Logf("Delete in range %s\n", smallrange)
xid, err := api.DeleteRange(baseParams, b, smallrange)
xid, err := api.DeleteMultiObj(baseParams, b, nil /*lst objnames*/, smallrange)
tassert.CheckError(t, err)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
Expand Down Expand Up @@ -983,7 +991,7 @@ func TestDeleteRange(t *testing.T) {

tlog.Logf("Delete in range %s\n", bigrange)
// 4. Delete the big range of objects
xid, err = api.DeleteRange(baseParams, b, bigrange)
xid, err = api.DeleteMultiObj(baseParams, b, nil /*lst objnames*/, bigrange)
tassert.CheckError(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
Expand Down Expand Up @@ -1060,7 +1068,7 @@ func TestStressDeleteRange(t *testing.T) {

// 2. Delete a range of objects
tlog.Logf("Deleting objects in range: %s\n", partialRange)
xid, err := api.DeleteRange(baseParams, bck, partialRange)
xid, err := api.DeleteMultiObj(baseParams, bck, nil /*lst objnames*/, partialRange)
tassert.CheckError(t, err)
args := xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
Expand Down Expand Up @@ -1092,7 +1100,7 @@ func TestStressDeleteRange(t *testing.T) {

// 4. Delete the entire range of objects
tlog.Logf("Deleting objects in range: %s\n", fullRange)
xid, err = api.DeleteRange(baseParams, bck, fullRange)
xid, err = api.DeleteMultiObj(baseParams, bck, nil /*lst objnames*/, fullRange)
tassert.CheckError(t, err)
args = xact.ArgsMsg{ID: xid, Kind: apc.ActDeleteObjects, Timeout: tools.RebalanceTimeout}
_, err = api.WaitForXactionIC(baseParams, &args)
Expand Down
98 changes: 98 additions & 0 deletions ais/test/scripts/s3-prefetch-latest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/bin/bash

## Prerequisites: #################################################################################
# - s3 bucket
# - s3cmd, $PATH-executable and configured to access the bucket out-of-band
# - aistore cluster, also configured to access the same bucket
#
## Example usage:
## ./ais/test/scripts/s3-prefetch-latest.sh --bucket s3://abc ########################

lorem='Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.'

duis='Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Et harum quidem..'

## Command line options (and their respective defaults)
bucket="s3://abc"

## constants
sum1="xxhash[ad97df912d23103f]"
sum2="xxhash[ecb5ed42299ea74d]"

host="--host=s3.amazonaws.com"

while (( "$#" )); do
case "${1}" in
--bucket) bucket=$2; shift; shift;;
*) echo "fatal: unknown argument '${1}'"; exit 1;;
esac
done

## uncomment for verbose output
set -x ## DEBUG

## establish existence
ais show bucket $bucket -c 1>/dev/null || exit $?

cleanup() {
rc=$?
ais object rm "$bucket/lorem-duis" 1>/dev/null 2>&1
exit $rc
}

trap cleanup EXIT INT TERM

echo -e
ais show performance counters --regex "(GET-COLD$|VERSION-CHANGE$|DELETE)"
echo -e

echo "1. out-of-band PUT: first version"
echo $lorem | s3cmd put - "$bucket/lorem-duis" $host 1>/dev/null || exit $?

echo "2. prefetch, and check"
ais prefetch "$bucket/lorem-duis"
sleep 5 ############################# DEBUG
checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}')
[[ "$checksum" == "$sum1" ]] || { echo "FAIL: $checksum != $sum1"; exit 1; }

echo "3. out-of-band PUT: 2nd version (overwrite)"
echo $duis | s3cmd put - "$bucket/lorem-duis" $host 1>/dev/null || exit $?

echo "4. prefetch and check (expecting the first version's checksum)"
ais prefetch "$bucket/lorem-duis"
sleep 5 ############################# DEBUG
checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}')
[[ "$checksum" != "$sum2" ]] || { echo "FAIL: $checksum == $sum2"; exit 1; }

echo "5. query cold-get count (statistics)"
cnt1=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}')

echo "6. prefetch latest: detect version change and trigger cold GET"
ais prefetch "$bucket/lorem-duis" --latest
sleep 5 ############################# DEBUG
checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}')
[[ "$checksum" == "$sum2" ]] || { echo "FAIL: $checksum != $sum2"; exit 1; }

echo "7. cold-get counter must increment"
cnt2=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}')
[[ $cnt2 == $(($cnt1+1)) ]] || { echo "FAIL: $cnt2 != $(($cnt1+1))"; exit 1; }

echo "8. warm GET must remain \"warm\" and cold-get-count must not increment"
ais get "$bucket/lorem-duis" /dev/null 1>/dev/null
checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}')
[[ "$checksum" == "$sum2" ]] || { echo "FAIL: $checksum != $sum2"; exit 1; }

cnt3=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}')
[[ $cnt3 == $cnt2 ]] || { echo "FAIL: $cnt3 != $cnt2"; exit 1; }

echo "9. out-of-band DELETE"
s3cmd del "$bucket/lorem-duis" $host 1>/dev/null || exit $?

echo "10. warm GET must (silently) trigger deletion"
ais get "$bucket/lorem-duis" /dev/null --silent 1>/dev/null 2>&1
[[ $? != 0 ]] || { echo "FAIL: expecting GET error, got $?"; exit 1; }
ais ls "$bucket/lorem-duis" --cached --silent -H 2>/dev/null
[[ $? != 0 ]] || { echo "FAIL: expecting 'show object' error, got $?"; exit 1; }

echo -e
ais show performance counters --regex "(GET-COLD$|VERSION-CHANGE$|DELETE)"
Loading

0 comments on commit 3699b60

Please sign in to comment.