Skip to content

Commit

Permalink
cos: add read-open-close-seeker; CLI: multi-put with client-side chec…
Browse files Browse the repository at this point in the history
…ksumming

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 26, 2025
1 parent 9cf2805 commit de32e6f
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 80 deletions.
18 changes: 14 additions & 4 deletions api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,28 @@ func (args *PutArgs) put(reqArgs *cmn.HreqArgs) (*http.Request, error) {
}
// Go http doesn't automatically set this for files, so to handle redirect we do it here.
req.GetBody = args.getBody

// compute client-side checksum
if args.Cksum != nil && args.Cksum.Ty() != cos.ChecksumNone {
req.Header.Set(apc.HdrObjCksumType, args.Cksum.Ty())
ckVal := args.Cksum.Value()
if ckVal == "" {
val := args.Cksum.Value()
if val == "" {
rocs, ok := args.Reader.(io.Seeker)
if !ok {
err := fmt.Errorf("cannot compute client-side checksum: reader (%T) does not implement io.Seeker",
args.Reader)
return nil, cmn.NewErrCreateHreq(err)
}
_, ckhash, err := cos.CopyAndChecksum(io.Discard, args.Reader, nil, args.Cksum.Ty())
if err != nil {
return nil, cmn.NewErrCreateHreq(err)
}
ckVal = hex.EncodeToString(ckhash.Sum())
val = hex.EncodeToString(ckhash.Sum())
rocs.Seek(0, io.SeekStart)
}
req.Header.Set(apc.HdrObjCksumVal, ckVal)
req.Header.Set(apc.HdrObjCksumVal, val)
}

if args.Size != 0 {
req.ContentLength = int64(args.Size) // as per https://tools.ietf.org/html/rfc7230#section-3.3.2
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/cli/arch_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,9 @@ func a2aRegular(c *cli.Context, a *archput) error {
args = barArgs{barType: sizeArg, barText: a.dst.oname, total: fi.Size()}
)
progress, bars = simpleBar(args)

cb := func(n int, _ error) { bars[0].IncrBy(n) }
reader = cos.NewCallbackReadOpenCloser(fh, cb)
reader = newRocCb(fh, cb, 0)
}
putArgs := api.PutArgs{
BaseParams: apiBP,
Expand Down
63 changes: 63 additions & 0 deletions cmd/cli/cli/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Package cli provides easy-to-use commands to manage, monitor, and utilize AIS clusters.
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*/
package cli

import (
"io"
"math"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
)

type rocCb struct {
roc cos.ROCS
cb func(int, error)
readBytes int // bytes read since last `Open`.
reportedBytes int // vs reopen
}

// interface guard
var (
_ cos.ReadOpenCloser = (*rocCb)(nil)
_ io.Seeker = (*rocCb)(nil)
)

///////////
// rocCb //
///////////

func newRocCb(roc cos.ROCS, readCb func(int, error), reportedBytes int) *rocCb {
return &rocCb{
roc: roc,
cb: readCb,
reportedBytes: reportedBytes,
}
}

func (r *rocCb) Read(p []byte) (n int, err error) {
n, err = r.roc.Read(p)
debug.Assert(r.readBytes < math.MaxInt-n)
r.readBytes += n
if delta := r.readBytes - r.reportedBytes; delta > 0 {
r.cb(delta, err)
r.reportedBytes += delta
}
return n, err
}

func (r *rocCb) Open() (cos.ReadOpenCloser, error) {
roc2, err := r.roc.OpenDup()
if err != nil {
return nil, err
}
return newRocCb(roc2, r.cb, r.reportedBytes), nil
}

func (r *rocCb) Close() error { return r.roc.Close() }

func (r *rocCb) Seek(offset int64, whence int) (int64, error) {
return r.roc.Seek(offset, whence)
}
8 changes: 4 additions & 4 deletions cmd/cli/cli/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func appendDefaultDecorators(options []mpb.BarOption) []mpb.BarOption {
// progIndicator -- TODO: reimplement via simpleBar()
///////////////////

func newProgIndicator(objName string) *progIndicator {
return &progIndicator{objName, atomic.NewInt64(0)}
}

func (*progIndicator) start() { fmt.Print("\033[s") }
func (*progIndicator) stop() { fmt.Println("") }

func (pi *progIndicator) printProgress(incr int64) {
fmt.Print("\033[u\033[K")
fmt.Printf("Uploaded %s: %s", pi.objName, cos.ToSizeIEC(pi.sizeTransferred.Add(incr), 2))
}

func newProgIndicator(objName string) *progIndicator {
return &progIndicator{objName, atomic.NewInt64(0)}
}
20 changes: 13 additions & 7 deletions cmd/cli/cli/verbfobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd
var (
err error
skipVC = flagIsSet(c, skipVerCksumFlag)
countReader = cos.NewCallbackReadOpenCloser(fh, updateBar /*progress callback*/)
countReader = newRocCb(fh, updateBar /*progress callback*/, 0)
iters = 1
isTout bool
)
Expand All @@ -354,12 +354,12 @@ func (u *uctx) do(c *cli.Context, p *uparams, fobj fobj, fh *cos.FileHandle, upd
fmt.Fprintln(c.App.ErrWriter, s)
briefPause(1)

ffh, errO := fh.Open()
ffh, errO := fh.OpenDup()
if errO != nil {
fmt.Fprintf(c.App.ErrWriter, "failed to reopen %s: %v\n", fobj.path, errO)
break
}
countReader = cos.NewCallbackReadOpenCloser(ffh, updateBar /*progress callback*/)
countReader = newRocCb(ffh, updateBar /*progress callback*/, 0)
isTout = isTimeout(e)
}
}
Expand Down Expand Up @@ -436,8 +436,9 @@ func putRegular(c *cli.Context, bck cmn.Bck, objName, path string, finfo os.File
// setup progress bar
args := barArgs{barType: sizeArg, barText: objName, total: finfo.Size()}
progress, bars = simpleBar(args)

cb := func(n int, _ error) { bars[0].IncrBy(n) }
reader = cos.NewCallbackReadOpenCloser(fh, cb)
reader = newRocCb(fh, cb, 0)
}

putArgs := api.PutArgs{
Expand Down Expand Up @@ -513,10 +514,13 @@ func putAppendChunks(c *cli.Context, bck cmn.Bck, objName string, r io.Reader, c
if n == 0 {
break
}
reader = cos.NewByteHandle(b.Bytes())

fh := cos.NewByteHandle(b.Bytes())
reader = fh
if flagIsSet(c, progressFlag) {
actualChunkOffset := atomic.NewInt64(0)
reader = cos.NewCallbackReadOpenCloser(reader, func(n int, _ error) {

readCb := func(n int, _ error) {
if n == 0 {
return
}
Expand All @@ -529,7 +533,9 @@ func putAppendChunks(c *cli.Context, bck cmn.Bck, objName string, r io.Reader, c
return
}
pi.printProgress(int64(n))
})
}

reader = newRocCb(fh, readCb, 0)
}
if i == 0 {
// overwrite, if exists
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.23.4

require (
github.com/NVIDIA/aistore v1.3.26-0.20250125181449-e08cc6399f26
github.com/NVIDIA/aistore v1.3.26-0.20250126173855-5d8f2d149c81
github.com/fatih/color v1.18.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo/v2 v2.21.0
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/NVIDIA/aistore v1.3.26-0.20250125181449-e08cc6399f26 h1:hcVGV1Djg564OyGQiicpm3RIjaUA58dynb9T8kCPFCY=
github.com/NVIDIA/aistore v1.3.26-0.20250125181449-e08cc6399f26/go.mod h1:rbl6S+ehV6m949UsHJA1Iz2+76/lZHwDm/KcfUnt9fw=
github.com/NVIDIA/aistore v1.3.26-0.20250126173855-5d8f2d149c81 h1:9Yo70VHJI2cYoAw4motUWnIL0HaqCle5/7/s4KnEWMA=
github.com/NVIDIA/aistore v1.3.26-0.20250126173855-5d8f2d149c81/go.mod h1:rbl6S+ehV6m949UsHJA1Iz2+76/lZHwDm/KcfUnt9fw=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
2 changes: 1 addition & 1 deletion cmd/ishard/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/ishard
go 1.23.4

require (
github.com/NVIDIA/aistore v1.3.26-0.20250125181449-e08cc6399f26
github.com/NVIDIA/aistore v1.3.26-0.20250126173855-5d8f2d149c81
github.com/json-iterator/go v1.1.12
github.com/vbauerster/mpb/v4 v4.12.2
)
Expand Down
4 changes: 2 additions & 2 deletions cmd/ishard/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/NVIDIA/aistore v1.3.26-0.20250125181449-e08cc6399f26 h1:hcVGV1Djg564OyGQiicpm3RIjaUA58dynb9T8kCPFCY=
github.com/NVIDIA/aistore v1.3.26-0.20250125181449-e08cc6399f26/go.mod h1:rbl6S+ehV6m949UsHJA1Iz2+76/lZHwDm/KcfUnt9fw=
github.com/NVIDIA/aistore v1.3.26-0.20250126173855-5d8f2d149c81 h1:9Yo70VHJI2cYoAw4motUWnIL0HaqCle5/7/s4KnEWMA=
github.com/NVIDIA/aistore v1.3.26-0.20250126173855-5d8f2d149c81/go.mod h1:rbl6S+ehV6m949UsHJA1Iz2+76/lZHwDm/KcfUnt9fw=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM=
Expand Down
74 changes: 16 additions & 58 deletions cmn/cos/io.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package cos provides common low-level types and utilities for all aistore projects
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2025, NVIDIA CORPORATION. All rights reserved.
*/
package cos

Expand Down Expand Up @@ -39,6 +39,11 @@ type (
io.ReadCloser
Open() (ReadOpenCloser, error)
}
ROCS interface {
ReadOpenCloser
io.Seeker
OpenDup() (ROCS, error)
}
ReadSizer interface {
io.Reader
Size() int64
Expand Down Expand Up @@ -80,16 +85,6 @@ type (
ReadCloseSizer
cb func()
}
CallbackROC struct {
roc ReadOpenCloser
readCallback func(int, error)
// Number of bytes we've already read, counting from last `Open`.
readBytes int
// Since we could possibly reopen a reader we must keep track of the
// bytes we already reported to `readCallback` so there is no duplications.
// This value is preserved across all the `Open`'s.
reportedBytes int
}
ReaderArgs struct {
R io.Reader
ReadCb func(int, error)
Expand Down Expand Up @@ -163,14 +158,16 @@ type (

// interface guard
var (
_ io.Reader = (*nopReader)(nil)
_ ReadOpenCloser = (*FileHandle)(nil)
_ ReadOpenCloser = (*CallbackROC)(nil)
_ ReadSizer = (*sizedReader)(nil)
_ io.Reader = (*nopReader)(nil)

_ ROCS = (*FileHandle)(nil)
_ ROCS = (*ByteHandle)(nil)

_ ReadSizer = (*sizedReader)(nil)

_ ReadOpenCloser = (*SectionHandle)(nil)
_ ReadOpenCloser = (*FileSectionHandle)(nil)
_ ReadOpenCloser = (*nopOpener)(nil)
_ ReadOpenCloser = (*ByteHandle)(nil)
)

///////////////
Expand Down Expand Up @@ -202,6 +199,7 @@ func (r *nopReader) Read(b []byte) (int, error) {
func NewByteHandle(bt []byte) *ByteHandle { return &ByteHandle{bytes.NewReader(bt), bt} }
func (*ByteHandle) Close() error { return nil }
func (b *ByteHandle) Open() (ReadOpenCloser, error) { return NewByteHandle(b.b), nil }
func (b *ByteHandle) OpenDup() (ROCS, error) { return NewByteHandle(b.b), nil }

///////////////
// nopOpener //
Expand All @@ -222,9 +220,8 @@ func NewFileHandle(fqn string) (*FileHandle, error) {
return &FileHandle{file, fqn}, nil
}

func (f *FileHandle) Open() (ReadOpenCloser, error) {
return NewFileHandle(f.fqn)
}
func (f *FileHandle) Open() (ReadOpenCloser, error) { return NewFileHandle(f.fqn) }
func (f *FileHandle) OpenDup() (ROCS, error) { return NewFileHandle(f.fqn) }

////////////
// Sized* //
Expand All @@ -250,45 +247,6 @@ func (r *deferRCS) Close() (err error) {
return
}

/////////////////
// CallbackROC //
/////////////////

func NewCallbackReadOpenCloser(r ReadOpenCloser, readCb func(int, error), reportedBytes ...int) *CallbackROC {
var rb int
if len(reportedBytes) > 0 {
rb = reportedBytes[0]
}
return &CallbackROC{
roc: r,
readCallback: readCb,
readBytes: 0,
reportedBytes: rb,
}
}

func (r *CallbackROC) Read(p []byte) (n int, err error) {
n, err = r.roc.Read(p)
debug.Assert(r.readBytes < math.MaxInt-n)
r.readBytes += n
if r.readBytes > r.reportedBytes {
diff := r.readBytes - r.reportedBytes
r.readCallback(diff, err)
r.reportedBytes += diff
}
return n, err
}

func (r *CallbackROC) Open() (ReadOpenCloser, error) {
rc, err := r.roc.Open()
if err != nil {
return rc, err
}
return NewCallbackReadOpenCloser(rc, r.readCallback, r.reportedBytes), nil
}

func (r *CallbackROC) Close() error { return r.roc.Close() }

////////////////////
// ReaderWithArgs //
////////////////////
Expand Down

0 comments on commit de32e6f

Please sign in to comment.