Skip to content

Commit

Permalink
blob downloader (new)
Browse files Browse the repository at this point in the history
* add stress test
* add error type: 'range-not-satisfiable'
* part ten, prev. commit: a9bbae5

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Feb 9, 2024
1 parent 34d5b24 commit ae7ae57
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 59 deletions.
47 changes: 38 additions & 9 deletions ais/test/scripted_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestGetWarmValidateRemaisUsingScript(t *testing.T) {
bucketName = bck.Cname("")
cmd = exec.Command("./scripts/remais-get-validate.sh", "--bucket", bucketName)
)
tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " "))
tlog.Logf("Running '%s'\n", cmd.String())
out, err := cmd.CombinedOutput()
if len(out) > 0 {
tlog.Logln(string(out))
Expand All @@ -84,7 +84,7 @@ func TestPrefetchLatestS3UsingScript(t *testing.T) {
bucketName = cliBck.Cname("")
cmd = exec.Command("./scripts/s3-prefetch-latest-prefix.sh", "--bucket", bucketName)
)
tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " "))
tlog.Logf("Running '%s'\n", cmd.String())
out, err := cmd.CombinedOutput()
if len(out) > 0 {
tlog.Logln(string(out))
Expand All @@ -111,7 +111,7 @@ func TestPrefetchLatestRemaisUsingScript(t *testing.T) {
bucketName = bck.Cname("")
cmd = exec.Command("./scripts/remais-prefetch-latest.sh", "--bucket", bucketName)
)
tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " "))
tlog.Logf("Running '%s'\n", cmd.String())
out, err := cmd.CombinedOutput()
if len(out) > 0 {
tlog.Logln(string(out))
Expand All @@ -130,16 +130,18 @@ func TestCopySyncWithOutOfBandUsingRemaisScript(t *testing.T) {
bucketName = bck.Cname("")
cmd = exec.Command("./scripts/cp-sync-remais-out-of-band.sh", "--bucket", bucketName)
)
tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " "))
tlog.Logln("Note: this may take a while...")
tlog.Logf("Running '%s'\n", cmd.String())
out, err := cmd.CombinedOutput()
if len(out) > 0 {
tlog.Logln(string(out))
}
tassert.CheckFatal(t, err)
}

// NOTE: not running with remote `cliBck` (because it could take hours)
// NOTE: not running with an actual remote s3 bucket (could take hours)
// instead, using aisore S3 API with a temp `ais://` bucket, and with two additional workarounds:
// 1. MD5
// 2. "apc.S3Scheme+apc.BckProviderSeparator+bck.Name" (below)
func TestMultipartUploadLargeFilesScript(t *testing.T) {
tools.CheckSkip(t, &tools.SkipTestArgs{
Long: true,
Expand All @@ -153,20 +155,47 @@ func TestMultipartUploadLargeFilesScript(t *testing.T) {

bck := cmn.Bck{Name: trand.String(10), Provider: apc.AIS}

// NOTE: need to set MD5 to satisfy `s3cmd` (for details, see docs/s3cmd.md & docs/s3compat.md)
// 1. set MD5 to satisfy `s3cmd` (for details, see docs/s3cmd.md & docs/s3compat.md)
bprops := &cmn.BpropsToSet{
Cksum: &cmn.CksumConfToSet{Type: apc.String(cos.ChecksumMD5)},
}
tools.CreateBucket(t, proxyURL, bck, bprops, true /*cleanup*/)

// 2. subst "ais://" with "s3://" to circumvent s3cmd failing with "not a recognized URI"
cmd := exec.Command("./scripts/s3-mpt-large-files.sh", tempdir, apc.S3Scheme+apc.BckProviderSeparator+bck.Name,
"1", // number of iterations
"true", // generate large files
"1", // number of large files
)

tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " "))
tlog.Logln("Note: this may take a while...")
tlog.Logf("Running '%s' (this may take a while...)\n", cmd.String())
out, err := cmd.CombinedOutput()
if len(out) > 0 {
tlog.Logln(string(out))
}
tassert.CheckFatal(t, err)
}

// remais-blob-download.sh
func TestRemaisBlobDownloadScript(t *testing.T) {
tools.CheckSkip(t, &tools.SkipTestArgs{
Long: true,
})
bck := cmn.Bck{
Name: trand.String(10),
Ns: cmn.Ns{UUID: tools.RemoteCluster.UUID},
Provider: apc.AIS,
}
tools.CreateBucket(t, proxyURL, bck, nil, true /*cleanup*/)
name := bck.Cname("")
cmd := exec.Command("./scripts/remais-blob-download.sh",
"--bucket", name,
"--minsize", "1MB",
"--maxsize", "10MB",
"--totalsize", "100MB",
"--chunksize", "500K",
"--numworkers", "5")
tlog.Logf("Running '%s' (this may take a while...)\n", cmd.String())
out, err := cmd.CombinedOutput()
if len(out) > 0 {
tlog.Logln(string(out))
Expand Down
38 changes: 29 additions & 9 deletions ais/test/scripts/remais-blob-download.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## Prerequisites: #################################################################################
#!/bin/bash

## Prerequisites: ###############################################################################
# - aistore cluster
# - remote aistore cluster (a.k.a. "remais")
# - optionally, remais bucket (the bucket will be created if doesn't exist)
Expand All @@ -9,8 +11,11 @@
# $ ais show remote-cluster -H
# $ JcHy3JUrL http://127.0.0.1:11080 remais v9 1 11m22.312048996s
#
## Run:
# $./ais/test/scripts/remais-blob-download.sh --bucket ais://@remais/abc --maxsize=10mb --totalsize=1G
## Examples:
# $./remais-blob-download.sh --bucket ais://@remais/abc --maxsize=10mb --totalsize=1G
#
# $./remais-blob-download.sh --bucket ais://@remais/abc --numworkers=5 --chunksize=500kb
#################################################################################################

if ! [ -x "$(command -v ais)" ]; then
echo "Error: ais (CLI) not installed" >&2
Expand All @@ -23,18 +28,25 @@ fi

## Command line options and respective defaults
bucket="ais://@remais/abc"
minsize="10MB"
maxsize="10MB"
totalsize="100MB"

minsize="10MiB"
maxsize="10MiB"
totalsize="100MiB"
chunksize="1MB"
numworkers=4

## runtime
max_num_downloads=100
subdir="blob-$RANDOM" ## destination for aisloader-generated content

while (( "$#" )); do
case "${1}" in
--bucket) bucket=$2; shift; shift;;
--minsize) minsize=$2; shift; shift;;
--maxsize) maxsize=$2; shift; shift;;
--totalsize) totalsize=$2; shift; shift;; *) echo "fatal: unknown argument '${1}'"; exit 1;;
--totalsize) totalsize=$2; shift; shift;;
--chunksize) chunksize=$2; shift; shift;;
--numworkers) numworkers=$2; shift; shift;;
*) echo "fatal: unknown argument '${1}'"; exit 1;;
esac
done

Expand Down Expand Up @@ -79,9 +91,17 @@ ais ls $bucket --all --limit 4
echo "..."
files=$(ais ls $bucket --prefix=$subdir/ --name-only -H --no-footers --all | awk '{print $1}')

## put some limit to it
count=0

## for all listed objects
for f in $files; do
ais blob-download $bucket/$f --wait 1> /dev/null || exit $?
xid=$(ais blob-download $bucket/$f --chunk-size $chunksize --num-workers $numworkers --nv || exit $?)
ais wait $xid || exit $?
count=`expr $count + 1`
if [ $count -ge $max_num_downloads ]; then
break
fi
done

echo "..."
Expand Down
2 changes: 1 addition & 1 deletion ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ func (goi *getOI) parseRange(resphdr http.Header, size int64) (hrng *htrange, er
var ranges []htrange
ranges, err = parseMultiRange(goi.ranges.Range, size)
if err != nil {
if _, ok := err.(*errRangeNoOverlap); ok {
if cmn.IsErrRangeNotSatisfiable(err) {
// https://datatracker.ietf.org/doc/html/rfc7233#section-4.2
resphdr.Set(cos.HdrContentRange, fmt.Sprintf("%s*/%d", cos.HdrContentRangeValPrefix, size))
}
Expand Down
16 changes: 1 addition & 15 deletions ais/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ type (
htrange struct {
Start, Length int64
}
errRangeNoOverlap struct {
ranges []string // RFC 7233
size int64 // [0, size)
}

// Local unicast IP info
localIPv4Info struct {
Expand Down Expand Up @@ -254,16 +250,6 @@ func (r htrange) contentRange(size int64) string {
return fmt.Sprintf("%s%d-%d/%d", cos.HdrContentRangeValPrefix, r.Start, r.Start+r.Length-1, size)
}

// ErrNoOverlap is returned by serveContent's parseRange if first-byte-pos of
// all of the byte-range-spec values is greater than the content size.
func (e *errRangeNoOverlap) Error() string {
msg := fmt.Sprintf("overlap with the content [0, %d)", e.size)
if len(e.ranges) == 1 {
return fmt.Sprintf("range %q does not %s", e.ranges[0], msg)
}
return fmt.Sprintf("none of the ranges %v %s", e.ranges, msg)
}

// ParseMultiRange parses a Range Header string as per RFC 7233.
// ErrNoOverlap is returned if none of the ranges overlap with the [0, size) content.
func parseMultiRange(s string, size int64) (ranges []htrange, err error) {
Expand Down Expand Up @@ -331,7 +317,7 @@ func parseMultiRange(s string, size int64) (ranges []htrange, err error) {
}

if noOverlap && len(ranges) == 0 {
return nil, &errRangeNoOverlap{allRanges, size}
return nil, cmn.NewErrRangeNotSatisfiable(allRanges, size)
}
return ranges, nil
}
Expand Down
6 changes: 3 additions & 3 deletions cmn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ type (
DataSlices int `json:"data_slices"`

// Depending on the object size and `ObjSizeLimit`, the value of `ParitySlices` (or P) indicates:
// - a number of additional parity slices to produce out of the (D) data slices of the object,
// - a number of additional parity slices (generated or _computed_ from the (D) data slices),
// or:
// - a number of full object replicas (copies).
// In all cases, the same rule applies: all the (D + P) slices and/or all (P) additional full copies
// are stored on a different storage nodes (a.k.a. targets) in a cluster.
// In all cases, the same rule applies: all slices and/or all full copies are stored on different
// storage nodes (a.k.a. targets).
ParitySlices int `json:"parity_slices"`

SbundleMult int `json:"bundle_multiplier"` // stream-bundle multiplier: num streams to destination
Expand Down
47 changes: 29 additions & 18 deletions cmn/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ type (
act string
bck *Bck
}
ErrRangeNotSatisfiable struct {
ranges []string // RFC 7233
size int64 // [0, size)
}
)

var (
Expand Down Expand Up @@ -678,9 +682,7 @@ func IsErrSoft(err error) bool {
return errors.As(err, &target)
}

///////////////////////
// ErrLmetaCorrupted & ErrLmetaNotFound
///////////////////////

func NewErrLmetaCorrupted(err error) *ErrLmetaCorrupted { return &ErrLmetaCorrupted{err} }
func (e *ErrLmetaCorrupted) Error() string { return e.err.Error() }
Expand All @@ -700,9 +702,7 @@ func IsErrLmetaNotFound(err error) bool {
return ok
}

///////////////////////////
// ErrLimitedCoexistence //
///////////////////////////
// ErrLimitedCoexistence

func NewErrLimitedCoexistence(node, xaction, action, detail string) *ErrLimitedCoexistence {
return &ErrLimitedCoexistence{node, xaction, action, detail}
Expand All @@ -713,9 +713,7 @@ func (e *ErrLimitedCoexistence) Error() string {
e.node, e.xaction, e.action, e.detail)
}

////////////////////
// ErrXactUsePrev //
////////////////////
// ErrXactUsePrev

func NewErrXactUsePrev(xaction string) *ErrXactUsePrev {
return &ErrXactUsePrev{xaction}
Expand All @@ -730,9 +728,7 @@ func IsErrXactUsePrev(err error) bool {
return ok
}

//
// ErrInvalidObjName
//

func ValidateObjName(name string) (err *ErrInvalidObjName) {
if cos.IsLastB(name, filepath.Separator) || strings.Contains(name, "../") {
Expand All @@ -745,9 +741,7 @@ func (e *ErrInvalidObjName) Error() string {
return fmt.Sprintf("invalid object name %q", e.name)
}

//
// ErrNotRemoteBck
//

func ValidateRemoteBck(act string, bck *Bck) (err *ErrNotRemoteBck) {
if !bck.IsRemote() {
Expand All @@ -760,9 +754,7 @@ func (e *ErrNotRemoteBck) Error() string {
return fmt.Sprintf("%s: expecting remote bucket (have %s)", e.act, e.bck)
}

///////////////////////
// ErrXactTgtInMaint //
///////////////////////
// ErrXactTgtInMaint

func NewErrXactTgtInMaint(xaction, tname string) *ErrXactTgtInMaint {
return &ErrXactTgtInMaint{xaction, tname}
Expand All @@ -773,9 +765,26 @@ func (e *ErrXactTgtInMaint) Error() string {
e.tname, e.xaction)
}

////////////////////////////
// error grouping helpers //
////////////////////////////
// ErrRangeNotSatisfiable
// http.StatusRequestedRangeNotSatisfiable = 416 // RFC 9110, 15.5.17

func NewErrRangeNotSatisfiable(ranges []string, size int64) *ErrRangeNotSatisfiable {
return &ErrRangeNotSatisfiable{ranges, size}
}

func (e *ErrRangeNotSatisfiable) Error() string {
s := "object size = " + strconv.FormatInt(e.size, 10)
return fmt.Sprintf("%s, range%s %v not satisfiable", s, cos.Plural(len(e.ranges)), e.ranges)
}

func IsErrRangeNotSatisfiable(err error) bool {
_, ok := err.(*ErrRangeNotSatisfiable)
return ok
}

//
// more is-error helpers
//

// nought: not a thing
func IsErrBucketNought(err error) bool {
Expand Down Expand Up @@ -1037,6 +1046,8 @@ func WriteErr(w http.ResponseWriter, r *http.Request, err error, opts ...int /*[
status = http.StatusNotFound
} else if IsErrCapExceeded(err) {
status = http.StatusInsufficientStorage
} else if IsErrRangeNotSatisfiable(err) {
status = http.StatusRequestedRangeNotSatisfiable
}

herr.init(r, err, status)
Expand Down
Loading

0 comments on commit ae7ae57

Please sign in to comment.