Skip to content

Commit

Permalink
prefetch v2; cold-get stats; put size
Browse files Browse the repository at this point in the history
* cold-get counters: semantics; add missing
* add scripted test: prefetch latest
  - requires s3cmd
* free put params back to pool
* consistently provide put size - all scenarios except one dsort shard
* up cli
* part three, prev. commit: 3699b60

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 3, 2024
1 parent 3699b60 commit 6e4414b
Show file tree
Hide file tree
Showing 20 changed files with 83 additions and 35 deletions.
1 change: 1 addition & 0 deletions ais/backend/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func (awsp *awsProvider) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT)
}
params := allocPutObjParams(res, owt)
err := awsp.t.PutObject(lom, params)
core.FreePutObjParams(params)
if superVerbose {
nlog.Infoln("[get_object]", lom.String(), err)
}
Expand Down
1 change: 1 addition & 0 deletions ais/backend/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func (ap *azureProvider) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT)
}
params := allocPutObjParams(res, owt)
err := ap.t.PutObject(lom, params)
core.FreePutObjParams(params)
if superVerbose {
nlog.Infoln("[get_object]", lom.String(), err)
}
Expand Down
1 change: 1 addition & 0 deletions ais/backend/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func allocPutObjParams(res core.GetReaderResult, owt cmn.OWT) *core.PutObjectPar
params.Reader = res.R
params.OWT = owt
params.Cksum = res.ExpCksum
params.Size = res.Size
params.Atime = time.Now()
params.ColdGET = true
}
Expand Down
1 change: 1 addition & 0 deletions ais/backend/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (gcpp *gcpProvider) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT)
}
params := allocPutObjParams(res, owt)
err := gcpp.t.PutObject(lom, params)
core.FreePutObjParams(params)
if superVerbose {
nlog.Infoln("[get_object]", lom.String(), err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package integration_test.
/*
* Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2023-2024, NVIDIA CORPORATION. All rights reserved.
*/
package integration_test

Expand Down Expand Up @@ -63,3 +63,25 @@ func TestGetWarmValidateRemaisUsingScript(t *testing.T) {
}
tassert.CheckFatal(t, err)
}

func TestPrefetchLatestUsingScript(t *testing.T) {
tools.CheckSkip(t, &tools.SkipTestArgs{
CloudBck: true,
Bck: cliBck,
})
// note additional limitation
normp, _ := cmn.NormalizeProvider(cliBck.Provider)
if normp != apc.AWS {
t.Skipf("skipping %s - the test uses s3cmd (command line tool) and requires s3 bucket (see \"prerequisites\")", t.Name())
}

var (
bucketName = cliBck.Cname("")
cmd = exec.Command("./scripts/s3-prefetch-latest.sh", "--bucket", bucketName)
)
out, err := cmd.CombinedOutput()
if len(out) > 0 {
tlog.Logln(string(out))
}
tassert.CheckFatal(t, err)
}
22 changes: 5 additions & 17 deletions ais/test/scripts/s3-prefetch-latest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ while (( "$#" )); do
done

## uncomment for verbose output
set -x ## DEBUG
## set -x

## establish existence
ais show bucket $bucket -c 1>/dev/null || exit $?
Expand All @@ -50,26 +50,23 @@ echo "1. out-of-band PUT: first version"
echo $lorem | s3cmd put - "$bucket/lorem-duis" $host 1>/dev/null || exit $?

echo "2. prefetch, and check"
ais prefetch "$bucket/lorem-duis"
sleep 5 ############################# DEBUG
ais prefetch "$bucket/lorem-duis" --wait
checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}')
[[ "$checksum" == "$sum1" ]] || { echo "FAIL: $checksum != $sum1"; exit 1; }

echo "3. out-of-band PUT: 2nd version (overwrite)"
echo $duis | s3cmd put - "$bucket/lorem-duis" $host 1>/dev/null || exit $?

echo "4. prefetch and check (expecting the first version's checksum)"
ais prefetch "$bucket/lorem-duis"
sleep 5 ############################# DEBUG
ais prefetch "$bucket/lorem-duis" --wait
checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}')
[[ "$checksum" != "$sum2" ]] || { echo "FAIL: $checksum == $sum2"; exit 1; }

echo "5. query cold-get count (statistics)"
cnt1=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}')

echo "6. prefetch latest: detect version change and trigger cold GET"
ais prefetch "$bucket/lorem-duis" --latest
sleep 5 ############################# DEBUG
ais prefetch "$bucket/lorem-duis" --latest --wait
checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}')
[[ "$checksum" == "$sum2" ]] || { echo "FAIL: $checksum != $sum2"; exit 1; }

Expand All @@ -78,21 +75,12 @@ cnt2=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{pr
[[ $cnt2 == $(($cnt1+1)) ]] || { echo "FAIL: $cnt2 != $(($cnt1+1))"; exit 1; }

echo "8. warm GET must remain \"warm\" and cold-get-count must not increment"
ais get "$bucket/lorem-duis" /dev/null 1>/dev/null
ais get "$bucket/lorem-duis" /dev/null --latest 1>/dev/null
checksum=$(ais ls "$bucket/lorem-duis" --cached -H -props checksum | awk '{print $2}')
[[ "$checksum" == "$sum2" ]] || { echo "FAIL: $checksum != $sum2"; exit 1; }

cnt3=$(ais show performance counters --regex GET-COLD -H | awk '{sum+=$2;}END{print sum;}')
[[ $cnt3 == $cnt2 ]] || { echo "FAIL: $cnt3 != $cnt2"; exit 1; }

echo "9. out-of-band DELETE"
s3cmd del "$bucket/lorem-duis" $host 1>/dev/null || exit $?

echo "10. warm GET must (silently) trigger deletion"
ais get "$bucket/lorem-duis" /dev/null --silent 1>/dev/null 2>&1
[[ $? != 0 ]] || { echo "FAIL: expecting GET error, got $?"; exit 1; }
ais ls "$bucket/lorem-duis" --cached --silent -H 2>/dev/null
[[ $? != 0 ]] || { echo "FAIL: expecting 'show object' error, got $?"; exit 1; }

echo -e
ais show performance counters --regex "(GET-COLD$|VERSION-CHANGE$|DELETE)"
16 changes: 14 additions & 2 deletions ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/mono"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
"github.com/NVIDIA/aistore/memsys"
"github.com/NVIDIA/aistore/stats"
"github.com/NVIDIA/aistore/transport/bundle"
"github.com/NVIDIA/aistore/xact/xreg"
)
Expand Down Expand Up @@ -75,6 +77,7 @@ func (t *target) PutObject(lom *core.LOM, params *core.PutObjectParams) error {
poi.workFQN = workFQN
poi.atime = params.Atime.UnixNano()
poi.xctn = params.Xact
poi.size = params.Size
poi.owt = params.OWT
poi.skipEC = params.SkipEC
poi.coldGET = params.ColdGET
Expand All @@ -84,6 +87,7 @@ func (t *target) PutObject(lom *core.LOM, params *core.PutObjectParams) error {
}
_, err := poi.putObject()
freePOI(poi)
debug.Assert(err != nil || params.Size <= 0 || params.Size == lom.SizeBytes(true), lom.String(), params.Size, lom.SizeBytes(true))
return err
}

Expand Down Expand Up @@ -155,7 +159,7 @@ func (t *target) CopyObject(lom *core.LOM, dm core.DM, dp core.DP, xact core.Xac
return size, err
}

// compare with goi.getCold
// use `backend.GetObj` (compare w/ other instances calling `backend.GetObjReader`)
func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (errCode int, err error) {
// 1. lock
switch owt {
Expand All @@ -178,7 +182,8 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (errCo
return
}

// 2. get from the remote B
// 2. GET remote object and store it
now := mono.NanoTime()
if errCode, err = t.Backend(lom.Bck()).GetObj(ctx, lom, owt); err != nil {
if owt != cmn.OwtGetPrefetchLock {
lom.Unlock(true)
Expand All @@ -194,6 +199,13 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (errCo
case cmn.OwtGetTryLock, cmn.OwtGetLock:
lom.Unlock(true)
}

// 4. stats
t.statsT.AddMany(
cos.NamedVal64{Name: stats.GetColdCount, Value: 1},
cos.NamedVal64{Name: stats.GetColdSize, Value: lom.SizeBytes()},
cos.NamedVal64{Name: stats.GetColdRwLatency, Value: mono.SinceNano(now)},
)
return
}

Expand Down
14 changes: 6 additions & 8 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (poi *putOI) write() (buf []byte, slab *memsys.Slab, lmfh *os.File, err err
if lmfh, err = poi.lom.CreateFile(poi.workFQN); err != nil {
return
}
if poi.size == 0 {
if poi.size <= 0 {
buf, slab = poi.t.gmm.Alloc()
} else {
buf, slab = poi.t.gmm.AllocSize(poi.size)
Expand Down Expand Up @@ -619,7 +619,7 @@ do:
}
goi.lom.SetAtimeUnix(goi.atime)

if loaded, err = goi.coldLock(); err != nil {
if loaded, err = goi._coldLock(); err != nil {
return
}
if loaded {
Expand All @@ -629,7 +629,7 @@ do:
// zero-out prev. version custom metadata, if any
goi.lom.SetCustomMD(nil)

// backend: read remote
// get remote reader (compare w/ t.GetCold)
res = goi.t.Backend(goi.lom.Bck()).GetObjReader(goi.ctx, goi.lom)
if res.Err != nil {
goi.lom.Unlock(true)
Expand All @@ -653,7 +653,7 @@ do:
}

// regular path
errCode, err = goi.coldPut(&res)
errCode, err = goi._coldPut(&res)
if err != nil {
goi.unlocked = true
return
Expand Down Expand Up @@ -687,7 +687,7 @@ fin:

// upgrade rlock => wlock
// done early to prevent multiple cold-readers duplicating network/disk operation and overwriting each other
func (goi *getOI) coldLock() (loaded bool, err error) {
func (goi *getOI) _coldLock() (loaded bool, err error) {
var (
t, lom = goi.t, goi.lom
now int64
Expand All @@ -713,8 +713,7 @@ outer:
return
}

// see also: t.GetCold() and goi.coldMem()
func (goi *getOI) coldPut(res *core.GetReaderResult) (int, error) {
func (goi *getOI) _coldPut(res *core.GetReaderResult) (int, error) {
var (
t, lom = goi.t, goi.lom
poi = allocPOI()
Expand All @@ -739,7 +738,6 @@ func (goi *getOI) coldPut(res *core.GetReaderResult) (int, error) {
nlog.Infoln(ftcg+"(put)", lom.Cname(), err)
return code, err
}
goi.t.statsT.Add(stats.GetColdRwLatency, mono.SinceNano(goi.ltime))

// load, downgrade lock, inc stats
if err = lom.Load(true /*cache it*/, true /*locked*/); err != nil {
Expand Down
17 changes: 14 additions & 3 deletions cmd/cli/cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,21 @@ func ensureRemoteProvider(bck cmn.Bck) error {
if !apc.IsProvider(bck.Provider) {
return fmt.Errorf("invalid bucket %q: missing backend provider", bck)
}
if !bck.IsRemote() {
return fmt.Errorf("invalid bucket %q: expecting remote backend", bck)
if bck.IsRemote() {
return nil
}
return nil
if bck.Props == nil {
// double-take: ais:// bucket with remote backend?
p, err := headBucket(bck, true)
if err != nil {
return err
}
bck.Props = p
if bck.IsRemote() {
return nil // yes it is
}
}
return fmt.Errorf("invalid bucket %q: expecting remote backend", bck)
}

func parseURLtoBck(strURL string) (bck cmn.Bck) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

// direct
require (
github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e
github.com/NVIDIA/aistore v1.3.22-0.20240102182946-3699b60ddad5
github.com/fatih/color v1.16.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e h1:6PgUhYxH7iDCQR97KAnvw1l0TkMvTkA3qdBQ/i72j5Y=
github.com/NVIDIA/aistore v1.3.22-0.20240102182613-70b89290d39e/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/NVIDIA/aistore v1.3.22-0.20240102182946-3699b60ddad5 h1:H0MwsbtH+5oZczV1LGPVCBCLk3/uX8S5y8nDJZnyA+U=
github.com/NVIDIA/aistore v1.3.22-0.20240102182946-3699b60ddad5/go.mod h1:jpWmGuqxnY+akx81S5eqHhGdgSENm0mVYRrVbpCW4/I=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
3 changes: 2 additions & 1 deletion core/ldp.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func (*LDP) Reader(lom *LOM, latestVer bool) (cos.ReadOpenCloser, cos.OAH, error
}

remote:
// cold GetObjReader and return oah (holder) to represent non-existing object
// GetObjReader and return remote (object) reader and oah for object metadata
// (compare w/ T.GetCold)
lom.SetAtimeUnix(time.Now().UnixNano())
oah := &cmn.ObjAttrs{
Ver: "", // TODO: differentiate between copying (same version) vs. transforming
Expand Down
1 change: 1 addition & 0 deletions core/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type (
Atime time.Time
Xact Xact
WorkTag string // (=> work fqn)
Size int64
OWT cmn.OWT
SkipEC bool // don't erasure-code when finalizing
ColdGET bool // this PUT is in fact a cold-GET
Expand Down
1 change: 1 addition & 0 deletions ec/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func writeObject(lom *core.LOM, reader io.Reader, size int64, xctn core.Xact) er
params.Reader = readCloser
params.SkipEC = true
params.Atime = time.Now()
params.Size = size
params.Xact = xctn
// to avoid changing version; TODO: introduce cmn.OwtEC
params.OWT = cmn.OwtMigrateRepl
Expand Down
1 change: 1 addition & 0 deletions ext/dload/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (task *singleTask) _dput(lom *core.LOM, req *http.Request, resp *http.Respo
params.Reader = r
params.OWT = cmn.OwtPut
params.Atime = task.started.Load()
params.Size = size
params.Xact = task.xdl
}
erp := core.T.PutObject(lom, params)
Expand Down
2 changes: 2 additions & 0 deletions ext/dsort/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ func (m *Manager) createShard(s *shard.Shard, lom *core.LOM) (err error) {

// TODO: params.Xact - in part, to count PUTs and bytes in a generic fashion
// (vs metrics.ShardCreationStats.updateThroughput - see below)

// TODO: add params.Size = (size resulting from shardRW.Create below)
}
err = core.T.PutObject(lom, params)
core.FreePutObjParams(params)
Expand Down
1 change: 1 addition & 0 deletions ext/dsort/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ func (m *Manager) recvShard(hdr *transport.ObjHdr, objReader io.Reader, err erro
params.Reader = rc
params.Cksum = nil
params.Atime = started
params.Size = hdr.ObjAttrs.Size
}
erp := core.T.PutObject(lom, params)
core.FreePutObjParams(params)
Expand Down
4 changes: 4 additions & 0 deletions stats/target_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
// -> "*.id" - ID
const (
// KindCounter & KindSize - always incremented

// NOTE semantics:
// - counts all instances when remote GET is followed by storing of the new object (version) locally
// - does _not_ count assorted calls to `GetObjReader` (e.g., via tcb/tco -> LDP.Reader)
GetColdCount = "get.cold.n"
GetColdSize = "get.cold.size"

Expand Down
1 change: 1 addition & 0 deletions xact/xs/tcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func (r *XactTCB) _recv(hdr *transport.ObjHdr, objReader io.Reader, lom *core.LO
params.Reader = io.NopCloser(objReader)
params.Cksum = hdr.ObjAttrs.Cksum
params.Xact = r
params.Size = hdr.ObjAttrs.Size

// Transaction is used only by CopyBucket and ETL. In both cases new objects
// are created at the destination. Setting `OwtPut` type informs `t.PutObject()`
Expand Down
1 change: 1 addition & 0 deletions xact/xs/tcobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (r *XactTCObjs) _put(hdr *transport.ObjHdr, objReader io.Reader, lom *core.
params.Reader = io.NopCloser(objReader)
params.Cksum = hdr.ObjAttrs.Cksum
params.Xact = r
params.Size = hdr.ObjAttrs.Size

// Transaction is used only by CopyBucket and ETL. In both cases, new objects
// are created at the destination. Setting `OwtPut` type informs `t.PutObject()`
Expand Down

0 comments on commit 6e4414b

Please sign in to comment.