Skip to content

Commit

Permalink
extend 'copy bucket' to sync remote
Browse files Browse the repository at this point in the history
* core: amend 'copy(object | reader)' datapath
* with partial rewrite
* part three, prev. commit: bec1dd5

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 15, 2023
1 parent e00a1ea commit c38e67f
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 164 deletions.
9 changes: 1 addition & 8 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,14 +1359,7 @@ func (t *target) objMv(lom *cluster.LOM, msg *apc.ActMsg) (err error) {
coi.finalize = true
}

if lom.Bck().IsRemote() || coi.bckTo.IsRemote() {
// when either one or both buckets are remote
coi.dp = &cluster.LDP{}
_, err = coi.copyReader(lom)
} else {
_, err = coi.copyObject(lom)
}

_, err = coi.do(lom)
slab.Free(buf)
freeCOI(coi)
if err != nil {
Expand Down
16 changes: 9 additions & 7 deletions ais/test/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2328,14 +2328,14 @@ func TestCopyBucket(t *testing.T) {
testName = fmt.Sprintf("src-remote-evicted/dst-remote=%t/", test.dstRemote)
}
if test.dstBckExist {
testName += "present/"
testName += "dst-present/"
if test.dstBckHasObjects {
testName += "with_objs"
} else {
testName += "without_objs"
}
} else {
testName += "absent"
testName += "dst-absent"
}
if test.multipleDests {
testName += "/multiple_dests"
Expand Down Expand Up @@ -2468,12 +2468,13 @@ func TestCopyBucket(t *testing.T) {
err error
cmsg = &apc.CopyBckMsg{Force: true}
)
tlog.Logf("copying %s => %s\n", srcm.bck, dstm.bck)
if test.evictRemoteSrc {
uuid, err = api.CopyBucket(baseParams, srcm.bck, dstm.bck, cmsg, apc.FltExists)
} else {
uuid, err = api.CopyBucket(baseParams, srcm.bck, dstm.bck, cmsg)
}
tassert.CheckFatal(t, err)
tlog.Logf("copying %s => %s: %s\n", srcm.bck, dstm.bck, uuid)
if uuids := strings.Split(uuid, xact.UUIDSepa); len(uuids) > 1 {
for _, u := range uuids {
tassert.Fatalf(t, xact.IsValidUUID(u), "invalid UUID %q", u)
Expand All @@ -2483,7 +2484,6 @@ func TestCopyBucket(t *testing.T) {
tassert.Fatalf(t, xact.IsValidUUID(uuid), "invalid UUID %q", uuid)
xactIDs = append(xactIDs, uuid)
}
tassert.CheckFatal(t, err)
}

for _, uuid := range xactIDs {
Expand Down Expand Up @@ -2555,7 +2555,8 @@ func TestCopyBucket(t *testing.T) {
dstBckList, err := api.ListObjects(baseParams, dstm.bck, msg, api.ListArgs{})
tassert.CheckFatal(t, err)
if len(dstBckList.Entries) != expectedObjCount {
t.Fatalf("list_objects: dst %d != %d src", len(dstBckList.Entries), expectedObjCount)
t.Fatalf("list_objects: dst %s, cnt %d != %d cnt, src %s",
dstm.bck.Cname(""), len(dstBckList.Entries), expectedObjCount, srcm.bck.Cname(""))
}

tlog.Logf("verifying that %d copied objects have identical props\n", expectedObjCount)
Expand All @@ -2566,14 +2567,15 @@ func TestCopyBucket(t *testing.T) {
found = true

if dstm.bck.IsRemote() && dstmProps.Versioning.Enabled {
tassert.Fatalf(t, b.Version != "", "Expected non-empty object %q version", b.Name)
tassert.Fatalf(t, b.Version != "",
"Expected non-empty object %q version", b.Name)
}

break
}
}
if !found {
t.Fatalf("%s is missing in the copied objects", srcm.bck.Cname(a.Name))
t.Fatalf("%s is missing in the destination bucket %s", srcm.bck.Cname(a.Name), dstm.bck.Cname(""))
}
}
}
Expand Down
15 changes: 4 additions & 11 deletions ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,10 @@ func (t *target) CopyObject(lom *cluster.LOM, dm cluster.DataMover, dp cluster.D
coi.objnameTo = lom.ObjName
}

switch {
case dp != nil: // 1. w/ transformation
size, err = coi.copyReader(lom)
case lom.Bck().IsRemote() || coi.bckTo.IsRemote(): // 2. when either one or both buckets are remote
coi.dp = &cluster.LDP{}
size, err = coi.copyReader(lom)
default: // 3.
size, err = coi.copyObject(lom)
}
coi.objsAdd(size, err)
size, err = coi.do(lom)
coi.stats(size, err)
freeCOI(coi)

return size, err
}

Expand Down Expand Up @@ -337,7 +330,7 @@ func (t *target) _promRemote(params *cluster.PromoteParams, lom *cluster.LOM, ts
coi.xact = params.Xact
coi.config = params.Config
}
size, err := coi.sendRemote(lom, lom.ObjName, tsi)
size, err := coi.send(lom, lom.ObjName, tsi)
freeCOI(coi)
return size, err
}
Expand Down
222 changes: 100 additions & 122 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,140 +1310,82 @@ func (a *apndOI) pack(workFQN string) string {
}

//
// COPY(object)
// COPY (object | reader)
//

func (coi *copyOI) objsAdd(size int64, err error) {
if err == nil && coi.xact != nil {
coi.xact.ObjsAdd(1, size)
}
}

func (coi *copyOI) copyObject(lom *cluster.LOM) (size int64, err error) {
debug.Assert(coi.dp == nil)

// TODO -- FIXME: redundant check?
// main method
func (coi *copyOI) do(lom *cluster.LOM) (size int64, err error) {
smap := coi.t.owner.smap.Get()
tsi, err := smap.HrwName2T(coi.bckTo.MakeUname(coi.objnameTo))
if err != nil {
return 0, err
tsi, errN := smap.HrwName2T(coi.bckTo.MakeUname(coi.objnameTo))
if errN != nil {
return 0, errN
}
if tsi.ID() != coi.t.SID() {
// dst location is tsi
return coi.sendRemote(lom, coi.objnameTo, tsi)
if coi.dp == nil && lom.Bck().IsRemote() {
coi.dp = &cluster.LDP{}
}
return coi.send(lom, coi.objnameTo, tsi)
}

// dry-run
if coi.dryRun {
// TODO: replace with something similar to lom.FQN == dst.FQN, but dstBck might not exist.
if lom.Bck().Equal(coi.bckTo, true /*same ID*/, true /*same backend*/) && lom.ObjName == coi.objnameTo {
return 0, nil
}
return lom.SizeBytes(), nil
return coi._dryRun(lom, coi.objnameTo)
}

// copy here
// 1. with transformation
// 2. when the source bucket is remote
// 3. regular
dst := cluster.AllocLOM(coi.objnameTo)
defer cluster.FreeLOM(dst)
if err = dst.InitBck(coi.bckTo.Bucket()); err != nil {
return
}
if lom.FQN == dst.FQN { // resilvering with a single mountpath?
return
}
exclusive := lom.Uname() == dst.Uname()
lom.Lock(exclusive)
defer lom.Unlock(exclusive)
if err = lom.Load(false /*cache it*/, true /*locked*/); err != nil {
if !cmn.IsObjNotExist(err) {
err = cmn.NewErrFailedTo(coi.t, "coi-load", lom, err)
}
return
if err := dst.InitBck(coi.bckTo.Bucket()); err != nil {
cluster.FreeLOM(dst)
return 0, err
}
// w-lock the destination unless overwriting the source
if lom.Uname() != dst.Uname() {
dst.Lock(true)
defer dst.Unlock(true)
if err = dst.Load(false /*cache it*/, true /*locked*/); err == nil {
if lom.EqCksum(dst.Checksum()) {
return
}
} else if cmn.IsErrBucketNought(err) {
return
}
switch {
case coi.dp != nil: // 1.
size, err = coi._reader(lom, dst)
case lom.Bck().IsRemote(): // 2.
coi.dp = &cluster.LDP{}
size, err = coi._reader(lom, dst)
default: // 3.
size, err = coi._regular(lom, dst)
}
dst2, err2 := lom.Copy2FQN(dst.FQN, coi.buf)
if err2 == nil {
size = lom.SizeBytes()
if coi.finalize {
coi.t.putMirror(dst2)
cluster.FreeLOM(dst)
return size, err
}

func (coi *copyOI) _dryRun(lom *cluster.LOM, objnameTo string) (size int64, err error) {
if coi.dp == nil {
if lom.Uname() != coi.bckTo.MakeUname(objnameTo) {
size = lom.SizeBytes()
}
return size, nil
}
err = err2
if dst2 != nil {
cluster.FreeLOM(dst2)

// discard the reader and be done
var reader io.ReadCloser
if reader, _, err = coi.dp.Reader(lom); err != nil {
return 0, err
}
return
size, err = io.Copy(io.Discard, reader)
reader.Close()
return size, err
}

/////////////////
// COPY READER //
/////////////////

// copyReader writes a new object that it reads using a special reader returned by
// coi.DP.Reader(lom).
//
// The reader is responsible for any read-locking of the source LOM, if necessary.
// (If the reader doesn't rlock the object's content may be subject to changing in the middle
// of copying/transforming, etc.)
// PUT DP(lom) => dst
// The DP reader is responsible for any read-locking of the source lom.
//
// LOM can be a "pure" metadata of a (non-existing) Cloud object. Accordingly, DP's reader must
// be able to hande cold get, warm get, etc.
// NOTE: no assumpions are being made on whether the source lom is present in cluster.
// (can be a "pure" metadata of a (non-existing) Cloud object; accordingly, DP's reader must
// be able to hande cold get, warm get, etc.)
//
// If destination bucket is remote, copyReader will:
// If destination bucket is remote:
// - create a local replica of the object on one of the targets, and
// - PUT to the relevant backend
// An option for _not_ storing the object _in_ the cluster would be a _feature_ that can be
// further debated.
func (coi *copyOI) copyReader(lom *cluster.LOM) (size int64, err error) {
var (
tsi *meta.Snode
smap = coi.t.owner.smap.Get()
)
// TODO -- FIXME: may be redundant, needless at this point
tsi, err = smap.HrwName2T(coi.bckTo.MakeUname(coi.objnameTo))
if err != nil {
return
}
if tsi.ID() != coi.t.SID() {
// remote dst
return coi.sendRemote(lom, coi.objnameTo, tsi)
}

if coi.dryRun {
// discard the reader and be done
return coi.dryRunCopyReader(lom)
}

// local dst (NOTE: no assumpions on whether the (src) lom is present)
dst := cluster.AllocLOM(coi.objnameTo)
size, err = coi.putReader(lom, dst)
cluster.FreeLOM(dst)

return
}

// PUT DP(lom) => dst
func (coi *copyOI) putReader(lom, dst *cluster.LOM) (size int64, err error) {
var (
reader cos.ReadOpenCloser
oah cos.OAH
)
if err = dst.InitBck(coi.bckTo.Bucket()); err != nil {
return
}
if reader, oah, err = coi.dp.Reader(lom); err != nil {
return
func (coi *copyOI) _reader(lom, dst *cluster.LOM) (size int64, err error) {
reader, oah, errN := coi.dp.Reader(lom)
if errN != nil {
return 0, errN
}
if lom.Bck().Equal(coi.bckTo, true, true) {
dst.SetVersion(oah.Version())
Expand Down Expand Up @@ -1476,16 +1418,49 @@ func (coi *copyOI) putReader(lom, dst *cluster.LOM) (size int64, err error) {
return size, err
}

func (coi *copyOI) dryRunCopyReader(lom *cluster.LOM) (size int64, err error) {
var reader io.ReadCloser
if reader, _, err = coi.dp.Reader(lom); err != nil {
return 0, err
func (coi *copyOI) _regular(lom, dst *cluster.LOM) (size int64, err error) {
if lom.FQN == dst.FQN { // resilvering with a single mountpath?
return
}
exclusive := lom.Uname() == dst.Uname()
lom.Lock(exclusive)
defer lom.Unlock(exclusive)
if err = lom.Load(false /*cache it*/, true /*locked*/); err != nil {
if !cmn.IsObjNotExist(err) {
err = cmn.NewErrFailedTo(coi.t, "coi-load", lom, err)
}
return
}
// w-lock the destination unless overwriting the source
if lom.Uname() != dst.Uname() {
dst.Lock(true)
defer dst.Unlock(true)
if err = dst.Load(false /*cache it*/, true /*locked*/); err == nil {
if lom.EqCksum(dst.Checksum()) {
return
}
} else if cmn.IsErrBucketNought(err) {
return
}
}
dst2, err2 := lom.Copy2FQN(dst.FQN, coi.buf)
if err2 == nil {
size = lom.SizeBytes()
if coi.finalize {
coi.t.putMirror(dst2)
}
}
defer reader.Close()
return io.Copy(io.Discard, reader)
err = err2
if dst2 != nil {
cluster.FreeLOM(dst2)
}
return
}

func (coi *copyOI) sendRemote(lom *cluster.LOM, objNameTo string, tsi *meta.Snode) (size int64, err error) {
// send object => designated target
// * source is a LOM or a reader (that may be reading from remote)
// * one of the two equivalent transmission mechanisms: PUT or transport Send
func (coi *copyOI) send(lom *cluster.LOM, objNameTo string, tsi *meta.Snode) (size int64, err error) {
debug.Assert(coi.owt > 0)
sargs := allocSnda()
{
Expand All @@ -1498,15 +1473,12 @@ func (coi *copyOI) sendRemote(lom *cluster.LOM, objNameTo string, tsi *meta.Snod
sargs.owt = coi.owt
}
}
size, err = coi.doSend(lom, sargs)
size, err = coi._send(lom, sargs)
freeSnda(sargs)
return
}

// send object => designated target
// * source is a LOM or a reader (that may be reading from remote)
// * one of the two equivalent transmission mechanisms: PUT or transport Send
func (coi *copyOI) doSend(lom *cluster.LOM, sargs *sendArgs) (size int64, err error) {
func (coi *copyOI) _send(lom *cluster.LOM, sargs *sendArgs) (size int64, err error) {
if coi.dm != nil {
// clone the `lom` to use it in the async operation (free it via `_sendObjDM` callback)
lom = lom.CloneMD(lom.FQN)
Expand Down Expand Up @@ -1630,6 +1602,12 @@ func (coi *copyOI) put(sargs *sendArgs) error {
return nil
}

func (coi *copyOI) stats(size int64, err error) {
if err == nil && coi.xact != nil {
coi.xact.ObjsAdd(1, size)
}
}

//
// PUT a new shard _or_ APPEND to an existing one (w/ read/write/list via cmn/archive)
//
Expand Down
Loading

0 comments on commit c38e67f

Please sign in to comment.