Skip to content

Commit

Permalink
core: revise object-write-transaction (OWT) enum
Browse files Browse the repository at this point in the history
* remove migrate-replicate OWT
* instead introduce { transform, copy, rebalance }; handle separately
* data movers to "inherit" OWT from the parent job
* part two, prev. commit af123b8

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 17, 2024
1 parent ea3efca commit b20bc72
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 50 deletions.
2 changes: 1 addition & 1 deletion ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ func (t *target) objMv(lom *core.LOM, msg *apc.ActMsg) (err error) {
coiParams.ObjnameTo = msg.Name /* new object name */
coiParams.Buf = buf
coiParams.Config = cmn.GCO.Get()
coiParams.OWT = cmn.OwtMigrateRepl
coiParams.OWT = cmn.OwtCopy
coiParams.Finalize = true
}
coi := (*copyOI)(coiParams)
Expand Down
2 changes: 1 addition & 1 deletion ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (t *target) HeadObjT2T(lom *core.LOM, si *meta.Snode) bool {
func (t *target) CopyObject(lom *core.LOM, dm core.DM, params *core.CopyParams) (size int64, err error) {
coi := (*copyOI)(params)
// defaults
coi.OWT = cmn.OwtMigrateRepl
coi.OWT = cmn.OwtCopy
coi.Finalize = false
if coi.ObjnameTo == "" {
coi.ObjnameTo = lom.ObjName
Expand Down
27 changes: 12 additions & 15 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (poi *putOI) fini() (errCode int, err error) {
bck = lom.Bck()
)
// put remote
if bck.IsRemote() && (poi.owt == cmn.OwtPut || poi.owt == cmn.OwtFinalize || poi.owt == cmn.OwtPromote) {
if bck.IsRemote() && poi.owt < cmn.OwtRebalance {
errCode, err = poi.putRemote()
if err != nil {
loghdr := poi.loghdr()
Expand Down Expand Up @@ -317,7 +317,7 @@ func (poi *putOI) fini() (errCode int, err error) {

// ais versioning
if bck.IsAIS() && lom.VersionConf().Enabled {
if poi.owt == cmn.OwtPut || poi.owt == cmn.OwtFinalize || poi.owt == cmn.OwtPromote {
if poi.owt < cmn.OwtRebalance {
if poi.skipVC {
err = lom.IncVersion()
debug.AssertNoErr(err)
Expand All @@ -335,7 +335,7 @@ func (poi *putOI) fini() (errCode int, err error) {
}
if lom.HasCopies() {
if errdc := lom.DelAllCopies(); errdc != nil {
nlog.Errorf("PUT (%s): failed to delete old copies [%v], proceeding to PUT anyway...", poi.loghdr(), errdc)
nlog.Errorf("PUT (%s): failed to delete old copies [%v], proceeding anyway...", poi.loghdr(), errdc)
}
}
if lom.AtimeUnix() == 0 { // (is set when migrating within cluster; prefetch special case)
Expand Down Expand Up @@ -477,7 +477,7 @@ func (poi *putOI) _cleanup(buf []byte, slab *memsys.Slab, lmfh *os.File, err err

func (poi *putOI) validateCksum(c *cmn.CksumConf) (v bool) {
switch poi.owt {
case cmn.OwtMigrateRepl, cmn.OwtPromote, cmn.OwtFinalize:
case cmn.OwtRebalance, cmn.OwtCopy:
v = c.ValidateObjMove
case cmn.OwtPut:
v = true
Expand Down Expand Up @@ -926,7 +926,7 @@ func (goi *getOI) getFromNeighbor(lom *core.LOM, tsi *meta.Snode) bool {
poi.lom = lom
poi.config = config
poi.r = resp.Body
poi.owt = cmn.OwtMigrateRepl
poi.owt = cmn.OwtRebalance
poi.workFQN = workFQN
poi.atime = lom.ObjAttrs().Atime
poi.cksumToUse = cksumToUse
Expand Down Expand Up @@ -1373,16 +1373,14 @@ func (coi *copyOI) _reader(t *target, dm *bundle.DataMover, lom, dst *core.LOM)
poi.lom = dst
poi.config = coi.Config
poi.r = reader
poi.owt = coi.OWT
poi.xctn = coi.Xact // on behalf of
poi.workFQN = fs.CSM.Gen(dst, fs.WorkfileType, "copy-dp")
poi.atime = oah.AtimeUnix()
poi.cksumToUse = oah.Checksum()
}
switch {
case dm != nil:
poi.owt = dm.OWT()
default:
poi.owt = cmn.OwtMigrateRepl
if dm != nil {
poi.owt = dm.OWT() // (compare with _send)
}
errCode, err := poi.putObject()
freePOI(poi)
Expand Down Expand Up @@ -1443,11 +1441,10 @@ func (coi *copyOI) send(t *target, dm *bundle.DataMover, lom *core.LOM, objNameT
sargs.objNameTo = objNameTo
sargs.tsi = tsi
sargs.dm = dm
if dm != nil {
sargs.owt = dm.OWT() // takes precedence
} else {
sargs.owt = coi.OWT
}
sargs.owt = coi.OWT
}
if dm != nil {
sargs.owt = dm.OWT() // takes precedence
}
size, err = coi._send(t, lom, sargs)
freeSnda(sargs)
Expand Down
2 changes: 1 addition & 1 deletion ais/tgts3.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, config *cmn.C
coiParams.Config = config
coiParams.BckTo = bckTo
coiParams.ObjnameTo = s3.ObjName(items)
coiParams.OWT = cmn.OwtMigrateRepl
coiParams.OWT = cmn.OwtCopy
}
coi := (*copyOI)(coiParams)
_, err = coi.do(t, nil /*DM*/, lom)
Expand Down
34 changes: 22 additions & 12 deletions cmn/owt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,28 @@ import (
"github.com/NVIDIA/aistore/cmn/debug"
)

// Object Write Transaction (OWT) is used to control some of the aspects of creating
// new objects in the cluster.
// Object Write Transaction (OWT) controls certain aspects of creating new objects in the cluster.
// In particular, OwtGet* group below simultaneously specifies cold-GET variations
// (that all involve reading from a remote backend) and the associated locking
// (that will always reflect a tradeoff between consistency and parallelism)
// NOTE: enum entries' order is important
type OWT int

const (
OwtPut OWT = iota // PUT
OwtMigrateRepl // migrate or replicate objects within cluster (e.g. global rebalance)
OwtPromote // promote target-accessible files and directories
OwtFinalize // finalize object archives
OwtGetTryLock // if !try-lock(exclusive) { return error }; read from remote; ...
OwtGetLock // lock(exclusive); read from remote; ...
OwtGet // GET (with upgrading read-lock in the local-write path)
OwtGetPrefetchLock // (used for maximum parallelism when prefetching)
// PUT and PUT-like transactions
OwtPut OWT = iota // PUT
OwtPromote // promote target-accessible files and directories
OwtFinalize // finalize object archives and S3 multipart
OwtTransform // ETL
OwtCopy // copy and move objects within cluster
OwtRebalance // NOTE: must be the last in PUT* group
//
// GET and friends
//
OwtGetTryLock // if !try-lock(exclusive) { return error }; read from remote; ...
OwtGetLock // lock(exclusive); read from remote; ...
OwtGet // GET (with upgrading read-lock in the local-write path)
OwtGetPrefetchLock // (used for maximum parallelism when prefetching)
)

func (owt *OWT) FromS(s string) {
Expand All @@ -41,12 +47,16 @@ func (owt OWT) String() (s string) {
switch owt {
case OwtPut:
s = "owt-put"
case OwtMigrateRepl:
s = "owt-migrate"
case OwtPromote:
s = "owt-promote"
case OwtFinalize:
s = "owt-finalize"
case OwtTransform:
s = "owt-transform"
case OwtCopy:
s = "owt-copy"
case OwtRebalance:
s = "owt-rebalance"
case OwtGetTryLock:
s = "owt-get-try-lock"
case OwtGetLock:
Expand Down
3 changes: 1 addition & 2 deletions ec/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ func writeObject(lom *core.LOM, reader io.Reader, size int64, xctn core.Xact) er
params.Atime = time.Now()
params.Size = size
params.Xact = xctn
// to avoid changing version; TODO: introduce cmn.OwtEC
params.OWT = cmn.OwtMigrateRepl
params.OWT = cmn.OwtCopy
}
err := core.T.PutObject(lom, params)
core.FreePutParams(params)
Expand Down
2 changes: 1 addition & 1 deletion reb/globrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func New(config *cmn.Config) *Reb {
Compression: config.Rebalance.Compression,
Multiplier: config.Rebalance.SbundleMult,
}
dm, err := bundle.NewDataMover(trname, reb.recvObj, cmn.OwtMigrateRepl, dmExtra)
dm, err := bundle.NewDataMover(trname, reb.recvObj, cmn.OwtRebalance, dmExtra)
if err != nil {
cos.ExitLog(err)
}
Expand Down
2 changes: 1 addition & 1 deletion reb/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (reb *Reb) recvObjRegular(hdr *transport.ObjHdr, smap *meta.Smap, unpacker
{
params.WorkTag = fs.WorkfilePut
params.Reader = io.NopCloser(objReader)
params.OWT = cmn.OwtMigrateRepl
params.OWT = cmn.OwtRebalance
params.Cksum = hdr.ObjAttrs.Cksum
params.Atime = lom.Atime()
params.Xact = xreb
Expand Down
19 changes: 11 additions & 8 deletions xact/xs/tcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type (
kind string
phase string // (see "transition")
args *xreg.TCBArgs
owt cmn.OWT
}
XactTCB struct {
p *tcbFactory
Expand Down Expand Up @@ -74,6 +75,12 @@ func (p *tcbFactory) Start() error {
slab, err = core.T.PageMM().GetSlab(memsys.MaxPageSlabSize) // TODO: estimate
)
debug.AssertNoErr(err)

p.owt = cmn.OwtCopy
if p.kind == apc.ActETLBck {
p.owt = cmn.OwtTransform
}

p.xctn = newTCB(p, slab, config)

// refcount OpcTxnDone; this target must ve active (ref: ignoreMaintenance)
Expand Down Expand Up @@ -104,7 +111,8 @@ func (p *tcbFactory) newDM(config *cmn.Config, uuid string, sizePDU int32) error
Multiplier: config.TCB.SbundleMult,
SizePDU: sizePDU,
}
dm, err := bundle.NewDataMover(trname+"-"+uuid, p.xctn.recv, cmn.OwtPut /* pass via coi */, dmExtra)
// in re cmn.OwtPut: see comment inside _recv()
dm, err := bundle.NewDataMover(trname+"-"+uuid, p.xctn.recv, p.owt, dmExtra)
if err != nil {
return err
}
Expand Down Expand Up @@ -263,7 +271,7 @@ func (r *XactTCB) do(lom *core.LOM, buf []byte) (err error) {
coiParams.BckTo = args.BckTo
coiParams.ObjnameTo = toName
coiParams.Buf = buf
coiParams.OWT = cmn.OwtMigrateRepl
coiParams.OWT = r.p.owt
coiParams.DryRun = args.Msg.DryRun
coiParams.LatestVer = args.Msg.LatestVer
coiParams.Sync = args.Msg.Sync
Expand Down Expand Up @@ -317,12 +325,7 @@ func (r *XactTCB) _recv(hdr *transport.ObjHdr, objReader io.Reader, lom *core.LO
params.Cksum = hdr.ObjAttrs.Cksum
params.Xact = r
params.Size = hdr.ObjAttrs.Size

// Transaction is used only by CopyBucket and ETL. In both cases new objects
// are created at the destination. Setting `OwtPut` type informs `t.PutObject()`
// that it must PUT the object to the remote backend as well
// (but only after the local transaction is done and finalized).
params.OWT = cmn.OwtPut
params.OWT = r.p.owt
}
if lom.AtimeUnix() == 0 {
// TODO: sender must be setting it, remove this `if` when fixed
Expand Down
18 changes: 10 additions & 8 deletions xact/xs/tcobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
workCh chan *cmn.TCObjsMsg
chanFull atomic.Int64
streamingX
owt cmn.OWT
}
tcowi struct {
r *XactTCObjs
Expand Down Expand Up @@ -83,6 +84,10 @@ func (p *tcoFactory) Start() error {
workCh := make(chan *cmn.TCObjsMsg, maxNumInParallel)
r := &XactTCObjs{streamingX: streamingX{p: &p.streamingF, config: cmn.GCO.Get()}, args: p.args, workCh: workCh}
r.pending.m = make(map[string]*tcowi, maxNumInParallel)
r.owt = cmn.OwtCopy
if p.kind == apc.ActETLObjects {
r.owt = cmn.OwtTransform
}
p.xctn = r
r.DemandBase.Init(p.UUID(), p.Kind(), p.Bck, xact.IdleDefault)

Expand All @@ -92,9 +97,11 @@ func (p *tcoFactory) Start() error {
// apc.ActETLObjects (transform) generates arbitrary sizes where we use PDU-based transport
sizePDU = memsys.DefaultBufSize
}
if err := p.newDM(p.Args.UUID /*trname*/, r.recv, r.config, cmn.OwtPut, sizePDU); err != nil {

if err := p.newDM(p.Args.UUID /*trname*/, r.recv, r.config, r.owt, sizePDU); err != nil {
return err
}

if r.p.dm != nil {
p.dm.SetXact(r)
p.dm.Open()
Expand Down Expand Up @@ -264,12 +271,7 @@ func (r *XactTCObjs) _put(hdr *transport.ObjHdr, objReader io.Reader, lom *core.
params.Cksum = hdr.ObjAttrs.Cksum
params.Xact = r
params.Size = hdr.ObjAttrs.Size

// Transaction is used only by CopyBucket and ETL. In both cases, new objects
// are created at the destination. Setting `OwtPut` type informs `t.PutObject()`
// that it must PUT the object to the remote backend as well
// (but only after the local transaction is done and finalized).
params.OWT = cmn.OwtPut
params.OWT = r.owt
}
if lom.AtimeUnix() == 0 {
// TODO: sender must be setting it, remove this `if` when fixed
Expand Down Expand Up @@ -309,7 +311,7 @@ func (wi *tcowi) do(lom *core.LOM, lrit *lriterator) {
coiParams.BckTo = wi.r.args.BckTo
coiParams.ObjnameTo = objNameTo
coiParams.Buf = buf
coiParams.OWT = cmn.OwtMigrateRepl
coiParams.OWT = wi.r.owt
coiParams.DryRun = wi.msg.DryRun
coiParams.LatestVer = wi.msg.LatestVer
coiParams.Sync = wi.msg.Sync
Expand Down

0 comments on commit b20bc72

Please sign in to comment.