From ae7ae57b459a54d786563bae84a46978e12050a8 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Fri, 9 Feb 2024 13:06:24 -0500 Subject: [PATCH] blob downloader (new) * add stress test * add error type: 'range-not-satisfiable' * part ten, prev. commit: a9bbae5fc90a842cf Signed-off-by: Alex Aizman --- ais/test/scripted_cli_test.go | 47 +++++++++++++++++++----- ais/test/scripts/remais-blob-download.sh | 38 ++++++++++++++----- ais/tgtobj.go | 2 +- ais/utils.go | 16 +------- cmn/config.go | 6 +-- cmn/err.go | 47 +++++++++++++++--------- xact/xs/blob_download.go | 20 ++++++++-- 7 files changed, 117 insertions(+), 59 deletions(-) diff --git a/ais/test/scripted_cli_test.go b/ais/test/scripted_cli_test.go index c2ebe77371d..3a51f746b4f 100644 --- a/ais/test/scripted_cli_test.go +++ b/ais/test/scripted_cli_test.go @@ -61,7 +61,7 @@ func TestGetWarmValidateRemaisUsingScript(t *testing.T) { bucketName = bck.Cname("") cmd = exec.Command("./scripts/remais-get-validate.sh", "--bucket", bucketName) ) - tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " ")) + tlog.Logf("Running '%s'\n", cmd.String()) out, err := cmd.CombinedOutput() if len(out) > 0 { tlog.Logln(string(out)) @@ -84,7 +84,7 @@ func TestPrefetchLatestS3UsingScript(t *testing.T) { bucketName = cliBck.Cname("") cmd = exec.Command("./scripts/s3-prefetch-latest-prefix.sh", "--bucket", bucketName) ) - tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " ")) + tlog.Logf("Running '%s'\n", cmd.String()) out, err := cmd.CombinedOutput() if len(out) > 0 { tlog.Logln(string(out)) @@ -111,7 +111,7 @@ func TestPrefetchLatestRemaisUsingScript(t *testing.T) { bucketName = bck.Cname("") cmd = exec.Command("./scripts/remais-prefetch-latest.sh", "--bucket", bucketName) ) - tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " ")) + tlog.Logf("Running '%s'\n", cmd.String()) out, err := cmd.CombinedOutput() if len(out) > 0 { tlog.Logln(string(out)) @@ -130,8 +130,7 @@ func TestCopySyncWithOutOfBandUsingRemaisScript(t *testing.T) { bucketName = bck.Cname("") cmd = exec.Command("./scripts/cp-sync-remais-out-of-band.sh", "--bucket", bucketName) ) - tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " ")) - tlog.Logln("Note: this may take a while...") + tlog.Logf("Running '%s'\n", cmd.String()) out, err := cmd.CombinedOutput() if len(out) > 0 { tlog.Logln(string(out)) @@ -139,7 +138,10 @@ func TestCopySyncWithOutOfBandUsingRemaisScript(t *testing.T) { tassert.CheckFatal(t, err) } -// NOTE: not running with remote `cliBck` (because it could take hours) +// NOTE: not running with an actual remote s3 bucket (could take hours) +// instead, using aisore S3 API with a temp `ais://` bucket, and with two additional workarounds: +// 1. MD5 +// 2. "apc.S3Scheme+apc.BckProviderSeparator+bck.Name" (below) func TestMultipartUploadLargeFilesScript(t *testing.T) { tools.CheckSkip(t, &tools.SkipTestArgs{ Long: true, @@ -153,20 +155,47 @@ func TestMultipartUploadLargeFilesScript(t *testing.T) { bck := cmn.Bck{Name: trand.String(10), Provider: apc.AIS} - // NOTE: need to set MD5 to satisfy `s3cmd` (for details, see docs/s3cmd.md & docs/s3compat.md) + // 1. set MD5 to satisfy `s3cmd` (for details, see docs/s3cmd.md & docs/s3compat.md) bprops := &cmn.BpropsToSet{ Cksum: &cmn.CksumConfToSet{Type: apc.String(cos.ChecksumMD5)}, } tools.CreateBucket(t, proxyURL, bck, bprops, true /*cleanup*/) + // 2. subst "ais://" with "s3://" to circumvent s3cmd failing with "not a recognized URI" cmd := exec.Command("./scripts/s3-mpt-large-files.sh", tempdir, apc.S3Scheme+apc.BckProviderSeparator+bck.Name, "1", // number of iterations "true", // generate large files "1", // number of large files ) - tlog.Logf("Running '%s %s'\n", cmd.Path, strings.Join(cmd.Args, " ")) - tlog.Logln("Note: this may take a while...") + tlog.Logf("Running '%s' (this may take a while...)\n", cmd.String()) + out, err := cmd.CombinedOutput() + if len(out) > 0 { + tlog.Logln(string(out)) + } + tassert.CheckFatal(t, err) +} + +// remais-blob-download.sh +func TestRemaisBlobDownloadScript(t *testing.T) { + tools.CheckSkip(t, &tools.SkipTestArgs{ + Long: true, + }) + bck := cmn.Bck{ + Name: trand.String(10), + Ns: cmn.Ns{UUID: tools.RemoteCluster.UUID}, + Provider: apc.AIS, + } + tools.CreateBucket(t, proxyURL, bck, nil, true /*cleanup*/) + name := bck.Cname("") + cmd := exec.Command("./scripts/remais-blob-download.sh", + "--bucket", name, + "--minsize", "1MB", + "--maxsize", "10MB", + "--totalsize", "100MB", + "--chunksize", "500K", + "--numworkers", "5") + tlog.Logf("Running '%s' (this may take a while...)\n", cmd.String()) out, err := cmd.CombinedOutput() if len(out) > 0 { tlog.Logln(string(out)) diff --git a/ais/test/scripts/remais-blob-download.sh b/ais/test/scripts/remais-blob-download.sh index 1e36ee525c5..ccbd0f7b330 100755 --- a/ais/test/scripts/remais-blob-download.sh +++ b/ais/test/scripts/remais-blob-download.sh @@ -1,4 +1,6 @@ -## Prerequisites: ################################################################################# +#!/bin/bash + +## Prerequisites: ############################################################################### # - aistore cluster # - remote aistore cluster (a.k.a. "remais") # - optionally, remais bucket (the bucket will be created if doesn't exist) @@ -9,8 +11,11 @@ # $ ais show remote-cluster -H # $ JcHy3JUrL http://127.0.0.1:11080 remais v9 1 11m22.312048996s # -## Run: -# $./ais/test/scripts/remais-blob-download.sh --bucket ais://@remais/abc --maxsize=10mb --totalsize=1G +## Examples: +# $./remais-blob-download.sh --bucket ais://@remais/abc --maxsize=10mb --totalsize=1G +# +# $./remais-blob-download.sh --bucket ais://@remais/abc --numworkers=5 --chunksize=500kb +################################################################################################# if ! [ -x "$(command -v ais)" ]; then echo "Error: ais (CLI) not installed" >&2 @@ -23,10 +28,14 @@ fi ## Command line options and respective defaults bucket="ais://@remais/abc" -minsize="10MB" -maxsize="10MB" -totalsize="100MB" - +minsize="10MiB" +maxsize="10MiB" +totalsize="100MiB" +chunksize="1MB" +numworkers=4 + +## runtime +max_num_downloads=100 subdir="blob-$RANDOM" ## destination for aisloader-generated content while (( "$#" )); do @@ -34,7 +43,10 @@ while (( "$#" )); do --bucket) bucket=$2; shift; shift;; --minsize) minsize=$2; shift; shift;; --maxsize) maxsize=$2; shift; shift;; - --totalsize) totalsize=$2; shift; shift;; *) echo "fatal: unknown argument '${1}'"; exit 1;; + --totalsize) totalsize=$2; shift; shift;; + --chunksize) chunksize=$2; shift; shift;; + --numworkers) numworkers=$2; shift; shift;; + *) echo "fatal: unknown argument '${1}'"; exit 1;; esac done @@ -79,9 +91,17 @@ ais ls $bucket --all --limit 4 echo "..." files=$(ais ls $bucket --prefix=$subdir/ --name-only -H --no-footers --all | awk '{print $1}') +## put some limit to it +count=0 + ## for all listed objects for f in $files; do - ais blob-download $bucket/$f --wait 1> /dev/null || exit $? + xid=$(ais blob-download $bucket/$f --chunk-size $chunksize --num-workers $numworkers --nv || exit $?) + ais wait $xid || exit $? + count=`expr $count + 1` + if [ $count -ge $max_num_downloads ]; then + break + fi done echo "..." diff --git a/ais/tgtobj.go b/ais/tgtobj.go index 4244b31d2f8..a8be50eef1a 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -1138,7 +1138,7 @@ func (goi *getOI) parseRange(resphdr http.Header, size int64) (hrng *htrange, er var ranges []htrange ranges, err = parseMultiRange(goi.ranges.Range, size) if err != nil { - if _, ok := err.(*errRangeNoOverlap); ok { + if cmn.IsErrRangeNotSatisfiable(err) { // https://datatracker.ietf.org/doc/html/rfc7233#section-4.2 resphdr.Set(cos.HdrContentRange, fmt.Sprintf("%s*/%d", cos.HdrContentRangeValPrefix, size)) } diff --git a/ais/utils.go b/ais/utils.go index eb2291116a6..dab006e7f34 100644 --- a/ais/utils.go +++ b/ais/utils.go @@ -41,10 +41,6 @@ type ( htrange struct { Start, Length int64 } - errRangeNoOverlap struct { - ranges []string // RFC 7233 - size int64 // [0, size) - } // Local unicast IP info localIPv4Info struct { @@ -254,16 +250,6 @@ func (r htrange) contentRange(size int64) string { return fmt.Sprintf("%s%d-%d/%d", cos.HdrContentRangeValPrefix, r.Start, r.Start+r.Length-1, size) } -// ErrNoOverlap is returned by serveContent's parseRange if first-byte-pos of -// all of the byte-range-spec values is greater than the content size. -func (e *errRangeNoOverlap) Error() string { - msg := fmt.Sprintf("overlap with the content [0, %d)", e.size) - if len(e.ranges) == 1 { - return fmt.Sprintf("range %q does not %s", e.ranges[0], msg) - } - return fmt.Sprintf("none of the ranges %v %s", e.ranges, msg) -} - // ParseMultiRange parses a Range Header string as per RFC 7233. // ErrNoOverlap is returned if none of the ranges overlap with the [0, size) content. func parseMultiRange(s string, size int64) (ranges []htrange, err error) { @@ -331,7 +317,7 @@ func parseMultiRange(s string, size int64) (ranges []htrange, err error) { } if noOverlap && len(ranges) == 0 { - return nil, &errRangeNoOverlap{allRanges, size} + return nil, cmn.NewErrRangeNotSatisfiable(allRanges, size) } return ranges, nil } diff --git a/cmn/config.go b/cmn/config.go index 3b9f79c0bf2..e23be6cdf32 100644 --- a/cmn/config.go +++ b/cmn/config.go @@ -206,11 +206,11 @@ type ( DataSlices int `json:"data_slices"` // Depending on the object size and `ObjSizeLimit`, the value of `ParitySlices` (or P) indicates: - // - a number of additional parity slices to produce out of the (D) data slices of the object, + // - a number of additional parity slices (generated or _computed_ from the (D) data slices), // or: // - a number of full object replicas (copies). - // In all cases, the same rule applies: all the (D + P) slices and/or all (P) additional full copies - // are stored on a different storage nodes (a.k.a. targets) in a cluster. + // In all cases, the same rule applies: all slices and/or all full copies are stored on different + // storage nodes (a.k.a. targets). ParitySlices int `json:"parity_slices"` SbundleMult int `json:"bundle_multiplier"` // stream-bundle multiplier: num streams to destination diff --git a/cmn/err.go b/cmn/err.go index c2284056e26..74c6a917da6 100644 --- a/cmn/err.go +++ b/cmn/err.go @@ -197,6 +197,10 @@ type ( act string bck *Bck } + ErrRangeNotSatisfiable struct { + ranges []string // RFC 7233 + size int64 // [0, size) + } ) var ( @@ -678,9 +682,7 @@ func IsErrSoft(err error) bool { return errors.As(err, &target) } -/////////////////////// // ErrLmetaCorrupted & ErrLmetaNotFound -/////////////////////// func NewErrLmetaCorrupted(err error) *ErrLmetaCorrupted { return &ErrLmetaCorrupted{err} } func (e *ErrLmetaCorrupted) Error() string { return e.err.Error() } @@ -700,9 +702,7 @@ func IsErrLmetaNotFound(err error) bool { return ok } -/////////////////////////// -// ErrLimitedCoexistence // -/////////////////////////// +// ErrLimitedCoexistence func NewErrLimitedCoexistence(node, xaction, action, detail string) *ErrLimitedCoexistence { return &ErrLimitedCoexistence{node, xaction, action, detail} @@ -713,9 +713,7 @@ func (e *ErrLimitedCoexistence) Error() string { e.node, e.xaction, e.action, e.detail) } -//////////////////// -// ErrXactUsePrev // -//////////////////// +// ErrXactUsePrev func NewErrXactUsePrev(xaction string) *ErrXactUsePrev { return &ErrXactUsePrev{xaction} @@ -730,9 +728,7 @@ func IsErrXactUsePrev(err error) bool { return ok } -// // ErrInvalidObjName -// func ValidateObjName(name string) (err *ErrInvalidObjName) { if cos.IsLastB(name, filepath.Separator) || strings.Contains(name, "../") { @@ -745,9 +741,7 @@ func (e *ErrInvalidObjName) Error() string { return fmt.Sprintf("invalid object name %q", e.name) } -// // ErrNotRemoteBck -// func ValidateRemoteBck(act string, bck *Bck) (err *ErrNotRemoteBck) { if !bck.IsRemote() { @@ -760,9 +754,7 @@ func (e *ErrNotRemoteBck) Error() string { return fmt.Sprintf("%s: expecting remote bucket (have %s)", e.act, e.bck) } -/////////////////////// -// ErrXactTgtInMaint // -/////////////////////// +// ErrXactTgtInMaint func NewErrXactTgtInMaint(xaction, tname string) *ErrXactTgtInMaint { return &ErrXactTgtInMaint{xaction, tname} @@ -773,9 +765,26 @@ func (e *ErrXactTgtInMaint) Error() string { e.tname, e.xaction) } -//////////////////////////// -// error grouping helpers // -//////////////////////////// +// ErrRangeNotSatisfiable +// http.StatusRequestedRangeNotSatisfiable = 416 // RFC 9110, 15.5.17 + +func NewErrRangeNotSatisfiable(ranges []string, size int64) *ErrRangeNotSatisfiable { + return &ErrRangeNotSatisfiable{ranges, size} +} + +func (e *ErrRangeNotSatisfiable) Error() string { + s := "object size = " + strconv.FormatInt(e.size, 10) + return fmt.Sprintf("%s, range%s %v not satisfiable", s, cos.Plural(len(e.ranges)), e.ranges) +} + +func IsErrRangeNotSatisfiable(err error) bool { + _, ok := err.(*ErrRangeNotSatisfiable) + return ok +} + +// +// more is-error helpers +// // nought: not a thing func IsErrBucketNought(err error) bool { @@ -1037,6 +1046,8 @@ func WriteErr(w http.ResponseWriter, r *http.Request, err error, opts ...int /*[ status = http.StatusNotFound } else if IsErrCapExceeded(err) { status = http.StatusInsufficientStorage + } else if IsErrRangeNotSatisfiable(err) { + status = http.StatusRequestedRangeNotSatisfiable } herr.init(r, err, status) diff --git a/xact/xs/blob_download.go b/xact/xs/blob_download.go index f2df4a84cb0..fe525c982d6 100644 --- a/xact/xs/blob_download.go +++ b/xact/xs/blob_download.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "sync" @@ -63,6 +64,7 @@ type ( err error sgl *memsys.SGL roff int64 + code int } blobFactory struct { xreg.RenewBase @@ -252,9 +254,14 @@ outer: select { case done := <-r.doneCh: sgl := done.sgl - if r.p.args.fullSize == done.roff+sgl.Size() || done.err == io.EOF { + if r.p.args.fullSize <= done.roff+sgl.Size() || done.code == http.StatusRequestedRangeNotSatisfiable { eof = true - if r.p.args.fullSize > done.roff+sgl.Size() { + switch { + case r.p.args.fullSize < done.roff+sgl.Size(): + err = fmt.Errorf("%s: detected size increase during download: expected %d, have %d", + r.Name(), r.p.args.fullSize, done.roff+sgl.Size()) + goto fin + case r.p.args.fullSize > done.roff+sgl.Size(): err = fmt.Errorf("%s: premature EOF: expected size %d, have %d", r.Name(), r.p.args.fullSize, done.roff+sgl.Size()) goto fin @@ -412,18 +419,23 @@ func (reader *blobReader) run() { if reader.parent.IsAborted() { break } + if res.ErrCode == http.StatusRequestedRangeNotSatisfiable { + debug.Assert(res.Size == 0) + reader.parent.doneCh <- blobDone{nil, sgl, msg.roff, http.StatusRequestedRangeNotSatisfiable} + break + } if err = res.Err; err == nil { written, err = io.Copy(sgl, res.R) } if err != nil { - reader.parent.doneCh <- blobDone{err, sgl, msg.roff} + reader.parent.doneCh <- blobDone{err, sgl, msg.roff, res.ErrCode} break } debug.Assert(res.Size == written, res.Size, " ", written) debug.Assert(sgl.Size() == written, sgl.Size(), " ", written) debug.Assert(sgl.Size() == sgl.Len(), sgl.Size(), " ", sgl.Len()) - reader.parent.doneCh <- blobDone{nil, sgl, msg.roff} + reader.parent.doneCh <- blobDone{nil, sgl, msg.roff, res.ErrCode} } reader.parent.wg.Done() }