Skip to content

Commit

Permalink
extend 'copy bucket' to sync remote
Browse files Browse the repository at this point in the history
* core: amend 'copy(object | reader)' datapath (cont-d)
* with checksum
* `sync-remote` vs `prepend`
* tcb: remove rlock
* part four

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 15, 2023
1 parent c38e67f commit 385becc
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 113 deletions.
10 changes: 6 additions & 4 deletions ais/htcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,10 +702,12 @@ func (cii *clusterInfo) fill(h *htrun) {
func (cii *clusterInfo) fillSmap(smap *smapX) {
cii.Smap.Version = smap.version()
cii.Smap.UUID = smap.UUID
cii.Smap.Primary.CtrlURL = smap.Primary.URL(cmn.NetIntraControl)
cii.Smap.Primary.PubURL = smap.Primary.URL(cmn.NetPublic)
cii.Smap.Primary.ID = smap.Primary.ID()
cii.Flags.VoteInProgress = voteInProgress() != nil
if smap.Primary != nil {
cii.Smap.Primary.CtrlURL = smap.Primary.URL(cmn.NetIntraControl)
cii.Smap.Primary.PubURL = smap.Primary.URL(cmn.NetPublic)
cii.Smap.Primary.ID = smap.Primary.ID()
cii.Flags.VoteInProgress = voteInProgress() != nil
}
}

func (cii *clusterInfo) smapEqual(other *clusterInfo) (ok bool) {
Expand Down
167 changes: 81 additions & 86 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type (

copyOI struct {
dm cluster.DataMover
dp cluster.DP
dp cluster.DP // transform via: ext/etl/dp.go or cluster/lom_dp.go
xact cluster.Xact
t *target
config *cmn.Config
Expand Down Expand Up @@ -1315,40 +1315,39 @@ func (a *apndOI) pack(workFQN string) string {

// main method
func (coi *copyOI) do(lom *cluster.LOM) (size int64, err error) {
if coi.dryRun {
return coi._dryRun(lom, coi.objnameTo)
}

// DP == nil: use default (no-op transform) if source bucket is remote
if coi.dp == nil && lom.Bck().IsRemote() {
coi.dp = &cluster.LDP{}
}

// 1: dst location
smap := coi.t.owner.smap.Get()
tsi, errN := smap.HrwName2T(coi.bckTo.MakeUname(coi.objnameTo))
if errN != nil {
return 0, errN
}
if tsi.ID() != coi.t.SID() {
if coi.dp == nil && lom.Bck().IsRemote() {
coi.dp = &cluster.LDP{}
}
return coi.send(lom, coi.objnameTo, tsi)
}

if coi.dryRun {
return coi._dryRun(lom, coi.objnameTo)
}

// 1. with transformation
// 2. when the source bucket is remote
// 3. regular
// dst is this target
// 2, 3: with transformation and without
dst := cluster.AllocLOM(coi.objnameTo)
if err := dst.InitBck(coi.bckTo.Bucket()); err != nil {
cluster.FreeLOM(dst)
return 0, err
}
switch {
case coi.dp != nil: // 1.
size, err = coi._reader(lom, dst)
case lom.Bck().IsRemote(): // 2.
coi.dp = &cluster.LDP{}
if coi.dp != nil {
size, err = coi._reader(lom, dst)
default: // 3.
} else {
size, err = coi._regular(lom, dst)
}
cluster.FreeLOM(dst)

return size, err
}

Expand Down Expand Up @@ -1382,24 +1381,25 @@ func (coi *copyOI) _dryRun(lom *cluster.LOM, objnameTo string) (size int64, err
// - PUT to the relevant backend
// An option for _not_ storing the object _in_ the cluster would be a _feature_ that can be
// further debated.
func (coi *copyOI) _reader(lom, dst *cluster.LOM) (size int64, err error) {
func (coi *copyOI) _reader(lom, dst *cluster.LOM) (size int64, _ error) {
reader, oah, errN := coi.dp.Reader(lom)
if errN != nil {
return 0, errN
}
if lom.Bck().Equal(coi.bckTo, true, true) {
dst.SetVersion(oah.Version())
}

poi := allocPOI()
{
poi.t = coi.t
poi.lom = dst
poi.config = coi.config
poi.r = reader
poi.xctn = coi.xact // on behalf of
poi.workFQN = fs.CSM.Gen(dst, fs.WorkfileType, "copy-dp")
poi.atime = lom.Atime().UnixNano()
poi.xctn = coi.xact
// TODO -- FIXME: checksum
poi.atime = oah.AtimeUnix()
poi.cksumToUse = oah.Checksum()
}
switch {
case coi.syncRemote:
Expand All @@ -1409,7 +1409,7 @@ func (coi *copyOI) _reader(lom, dst *cluster.LOM) (size int64, err error) {
default:
poi.owt = cmn.OwtMigrate
}
_, err = poi.putObject()
_, err := poi.putObject()
freePOI(poi)
if err == nil {
// xaction stats: inc locally processed (and see data mover for in and out objs)
Expand All @@ -1418,43 +1418,44 @@ func (coi *copyOI) _reader(lom, dst *cluster.LOM) (size int64, err error) {
return size, err
}

func (coi *copyOI) _regular(lom, dst *cluster.LOM) (size int64, err error) {
func (coi *copyOI) _regular(lom, dst *cluster.LOM) (size int64, _ error) {
if lom.FQN == dst.FQN { // resilvering with a single mountpath?
return
}
exclusive := lom.Uname() == dst.Uname()
lom.Lock(exclusive)
defer lom.Unlock(exclusive)
if err = lom.Load(false /*cache it*/, true /*locked*/); err != nil {
lcopy := lom.Uname() == dst.Uname() // n-way copy
lom.Lock(lcopy)
defer lom.Unlock(lcopy)

if err := lom.Load(false /*cache it*/, true /*locked*/); err != nil {
if !cmn.IsObjNotExist(err) {
err = cmn.NewErrFailedTo(coi.t, "coi-load", lom, err)
}
return
return 0, err
}
// w-lock the destination unless overwriting the source
if lom.Uname() != dst.Uname() {

// w-lock the destination unless already locked (above)
if !lcopy {
dst.Lock(true)
defer dst.Unlock(true)
if err = dst.Load(false /*cache it*/, true /*locked*/); err == nil {
if err := dst.Load(false /*cache it*/, true /*locked*/); err == nil {
if lom.EqCksum(dst.Checksum()) {
return
return 0, nil
}
} else if cmn.IsErrBucketNought(err) {
return
return 0, err
}
}
dst2, err2 := lom.Copy2FQN(dst.FQN, coi.buf)
if err2 == nil {
dst2, err := lom.Copy2FQN(dst.FQN, coi.buf)
if err == nil {
size = lom.SizeBytes()
if coi.finalize {
coi.t.putMirror(dst2)
}
}
err = err2
if dst2 != nil {
cluster.FreeLOM(dst2)
}
return
return size, err
}

// send object => designated target
Expand All @@ -1478,74 +1479,68 @@ func (coi *copyOI) send(lom *cluster.LOM, objNameTo string, tsi *meta.Snode) (si
return
}

func (coi *copyOI) _send(lom *cluster.LOM, sargs *sendArgs) (size int64, err error) {
func (coi *copyOI) _send(lom *cluster.LOM, sargs *sendArgs) (size int64, _ error) {
debug.Assert(!coi.dryRun)
if coi.dm != nil {
// clone the `lom` to use it in the async operation (free it via `_sendObjDM` callback)
lom = lom.CloneMD(lom.FQN)
}
if coi.dp == nil {
var reader cos.ReadOpenCloser
if coi.owt != cmn.OwtPromote {
// 1. migrate/replicate lom
lom.Lock(false)
err := lom.Load(false /*cache it*/, true /*locked*/)
if err != nil {
lom.Unlock(false)

switch {
case coi.owt == cmn.OwtPromote:
// 1. promote
debug.Assert(coi.dp == nil)
debug.Assert(sargs.owt == cmn.OwtPromote)

fh, err := cos.NewFileHandle(lom.FQN)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
if coi.dryRun {
lom.Unlock(false)
return lom.SizeBytes(), nil
}
if reader, err = lom.NewDeferROC(); err != nil {
return 0, err
}
size = lom.SizeBytes()
} else {
// 2. promote local file
debug.Assert(!coi.dryRun)
debug.Assert(sargs.owt == cmn.OwtPromote)
fh, err := cos.NewFileHandle(lom.FQN)
if err != nil {
if os.IsNotExist(err) {
return 0, nil
}
return 0, cmn.NewErrFailedTo(coi.t, "open", lom.FQN, err)
}
fi, err := fh.Stat()
if err != nil {
fh.Close()
return 0, cmn.NewErrFailedTo(coi.t, "fstat", lom.FQN, err)
}
size = fi.Size()
reader = fh
return 0, cmn.NewErrFailedTo(coi.t, "open", lom.FQN, err)
}
sargs.reader = reader
sargs.objAttrs = lom
} else {
// 3. get a reader for this object
// - iff the object is not present ("cached"):
// - call t.Backend.GetObjReader, a variation of cold-GET
if sargs.reader, sargs.objAttrs, err = coi.dp.Reader(lom); err != nil {
return
fi, err := fh.Stat()
if err != nil {
fh.Close()
return 0, cmn.NewErrFailedTo(coi.t, "fstat", lom.FQN, err)
}
size = fi.Size()
sargs.reader, sargs.objAttrs = fh, lom
case coi.dp == nil:
// 2. migrate/replicate lom

lom.Lock(false)
if err := lom.Load(false /*cache it*/, true /*locked*/); err != nil {
lom.Unlock(false)
return 0, nil
}
reader, err := lom.NewDeferROC()
if err != nil {
return 0, err
}
if coi.dryRun {
size, err = io.Copy(io.Discard, sargs.reader)
cos.Close(sargs.reader)
size = lom.SizeBytes()
sargs.reader, sargs.objAttrs = reader, lom
default:
// 3. DP transform (possibly, no-op)
// If the object is not present call t.Backend.GetObjReader
reader, oah, err := coi.dp.Reader(lom)
if err != nil {
return
}
// NOTE: returns the cos.ContentLengthUnknown (-1) if post-transform size not known.
size = sargs.objAttrs.SizeBytes()
// returns cos.ContentLengthUnknown (-1) if post-transform size is unknown
size = oah.SizeBytes()
sargs.reader, sargs.objAttrs = reader, oah
}

// do
var err error
sargs.bckTo = coi.bckTo
if sargs.dm != nil {
err = coi._dm(lom /*for attrs*/, sargs)
} else {
err = coi.put(sargs)
}
return
return size, err
}

// use data mover to transmit objects to other targets
Expand Down
2 changes: 1 addition & 1 deletion cluster/lom_dp.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (lom *LOM) NewDeferROC() (cos.ReadOpenCloser, error) {
return nil, cmn.NewErrFailedTo(g.t, "open", lom.FQN, err)
}

// compare with etl/dp.go
// compare with ext/etl/dp.go
func (*LDP) Reader(lom *LOM) (cos.ReadOpenCloser, cos.OAH, error) {
lom.Lock(false)
loadErr := lom.Load(false /*cache it*/, true /*locked*/)
Expand Down
32 changes: 26 additions & 6 deletions mirror/tcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (p *tcbFactory) Start() error {

// sync same-name remote
p.xctn.syncRemote = p.kind != apc.ActETLBck &&
p.args.Msg.CopyBckMsg.Prepend == "" &&
p.args.BckFrom.Equal(p.args.BckTo, true /*same BID*/, true /*same backend*/)

// refcount OpcTxnDone; this target must ve active (ref: ignoreMaintenance)
Expand Down Expand Up @@ -226,18 +227,18 @@ func (r *XactTCB) qcb(tot time.Duration) cluster.QuiRes {
func (r *XactTCB) copyObject(lom *cluster.LOM, buf []byte) (err error) {
var (
args = r.p.args // TCBArgs
toName = r.p.args.Msg.ToName(lom.ObjName)
toName = args.Msg.ToName(lom.ObjName)
)
if r.BckJog.Config.FastV(5, cos.SmoduleMirror) {
nlog.Infof("%s: %s => %s", r.Base.Name(), lom.Cname(), args.BckTo.Cname(toName))
}
_, err = r.p.T.CopyObject(lom, r.dm, args.DP, r, r.Config, args.BckTo, toName, buf, args.Msg.DryRun, r.syncRemote)
if err != nil {
if cos.IsErrOOS(err) {
// TODO: call r.Abort() instead
err = cmn.NewErrAborted(r.Name(), "tcb", err)
r.Abort(err)
} else {
r.AddErr(err)
}
r.AddErr(err)
if r.BckJog.Config.FastV(5, cos.SmoduleMirror) {
nlog.Infof("Error: %v", err)
}
Expand Down Expand Up @@ -305,12 +306,31 @@ func (r *XactTCB) _recv(hdr *transport.ObjHdr, objReader io.Reader, lom *cluster

func (r *XactTCB) Args() *xreg.TCBArgs { return r.p.args }

func (r *XactTCB) _str() (s string) {
msg := &r.p.args.Msg.CopyBckMsg
if msg.Prefix != "" {
s = ", prefix " + r.p.args.Msg.Prefix
}
if msg.Prepend != "" {
s = ", prepend " + r.p.args.Msg.Prepend
}
return s
}

func (r *XactTCB) String() string {
return fmt.Sprintf("%s <= %s", r.Base.String(), r.p.args.BckFrom)
s := r._str()
if r.syncRemote {
return fmt.Sprintf("%s%s sync-remote", r.Base.String(), s)
}
return fmt.Sprintf("%s <= %s%s", r.Base.String(), r.p.args.BckFrom.String(), s)
}

func (r *XactTCB) Name() string {
return fmt.Sprintf("%s <= %s", r.Base.Name(), r.p.args.BckFrom)
s := r._str()
if r.syncRemote {
return fmt.Sprintf("%s%s sync-remote", r.Base.Name(), s)
}
return fmt.Sprintf("%s <= %s%s", r.Base.Name(), r.p.args.BckFrom.String(), s)
}

func (r *XactTCB) FromTo() (*meta.Bck, *meta.Bck) {
Expand Down
Loading

0 comments on commit 385becc

Please sign in to comment.