Skip to content

Commit

Permalink
refactor list-range-prefix iterator
Browse files Browse the repository at this point in the history
* prep to support '--sync' by multi-object copy/transform

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 19, 2024
1 parent 75d9280 commit e679048
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 83 deletions.
10 changes: 5 additions & 5 deletions xact/xs/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ func (r *XactArch) Run(wg *sync.WaitGroup) {
smap = core.T.Sowner().Get()
lrit = &lriterator{}
)
lrit.init(r, &msg.ListRange)
if msg.IsList() {
err = lrit.iterList(wi, smap)
} else {
err = lrit.rangeOrPref(wi, smap)
err = lrit.init(r, &msg.ListRange, r.Bck())
if err != nil {
r.Abort(err)
goto fin
}
err = lrit.run(wi, smap)
if err != nil {
r.AddErr(err)
}
Expand Down
22 changes: 11 additions & 11 deletions xact/xs/evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (p *evdFactory) New(args xreg.Args, bck *meta.Bck) xreg.Renewable {
return np
}

func (p *evdFactory) Start() error {
p.xctn = newEvictDelete(&p.Args, p.kind, p.Bck, p.msg)
return nil
func (p *evdFactory) Start() (err error) {
p.xctn, err = newEvictDelete(&p.Args, p.kind, p.Bck, p.msg)
return err
}

func (p *evdFactory) Kind() string { return p.kind }
Expand All @@ -55,20 +55,20 @@ func (*evdFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) {
return xreg.WprKeepAndStartNew, nil
}

func newEvictDelete(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.ListRange) (ed *evictDelete) {
func newEvictDelete(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.ListRange) (ed *evictDelete, err error) {
ed = &evictDelete{config: cmn.GCO.Get()}
ed.lriterator.init(ed, msg)
if err = ed.lriterator.init(ed, msg, bck); err != nil {
return nil, err
}
ed.InitBase(xargs.UUID, kind, bck)
return
return ed, nil
}

func (r *evictDelete) Run(wg *sync.WaitGroup) {
wg.Done()
smap := core.T.Sowner().Get()
if r.msg.IsList() {
_ = r.iterList(r, smap)
} else {
_ = r.rangeOrPref(r, smap)
err := r.lriterator.run(r, core.T.Sowner().Get())
if err != nil {
r.AddErr(err, 5, cos.SmoduleXs) // duplicated?
}
r.Finish()
}
Expand Down
111 changes: 64 additions & 47 deletions xact/xs/lrit.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ type (
// a strict subset of core.Xact, includes only the methods
// lriterator needs for itself
lrxact interface {
Bck() *meta.Bck
IsAborted() bool
Finished() bool
}
// common multi-obj operation context and iterList()/iterRangeOrPref() logic
lriterator struct {
xctn lrxact
msg *apc.ListRange
lrp int // { lrpList, ... } enum
parent lrxact
msg *apc.ListRange
bck *meta.Bck
pt *cos.ParsedTemplate
prefix string
lrp int // { lrpList, ... } enum
}
)

Expand All @@ -73,16 +75,19 @@ var (
// lriterator //
////////////////

func (r *lriterator) init(xctn lrxact, msg *apc.ListRange) {
r.xctn = xctn
func (r *lriterator) init(xctn lrxact, msg *apc.ListRange, bck *meta.Bck) error {
r.parent = xctn
r.msg = msg
}
r.bck = bck
if msg.IsList() {
r.lrp = lrpList
return nil
}

func (r *lriterator) rangeOrPref(wi lrwi, smap *meta.Smap) error {
pt, err := cos.NewParsedTemplate(r.msg.Template)
pt, err := cos.NewParsedTemplate(msg.Template)
if err != nil {
// NOTE: [making an exception] treating an empty or '*' template
// as an empty prefix to facilitate scope 'all-objects'
// [Making Exception] treating an empty or '*' template
// as an empty prefix, to facilitate all-objects scope
// (e.g., "archive entire bucket as a single shard (caution!)
if err == cos.ErrEmptyTemplate {
pt.Prefix = ""
Expand All @@ -95,19 +100,35 @@ func (r *lriterator) rangeOrPref(wi lrwi, smap *meta.Smap) error {
return err
}
if len(pt.Ranges) != 0 {
r.pt = &pt
r.lrp = lrpRange
return r.iterRange(smap, &pt, wi)
return nil
}
pref:
r.prefix = pt.Prefix
r.lrp = lrpPrefix
return r.iteratePrefix(smap, pt.Prefix, wi)
return nil
}

func (r *lriterator) iterRange(smap *meta.Smap, pt *cos.ParsedTemplate, wi lrwi) error {
pt.InitIter()
for objName, hasNext := pt.Next(); hasNext; objName, hasNext = pt.Next() {
if r.xctn.IsAborted() || r.xctn.Finished() {
return nil
func (r *lriterator) run(wi lrwi, smap *meta.Smap) (err error) {
switch r.lrp {
case lrpList:
err = r._list(wi, smap)
case lrpRange:
err = r._range(wi, smap)
case lrpPrefix:
err = r._prefix(wi, smap)
}
return err
}

func (r *lriterator) done() bool { return r.parent.IsAborted() || r.parent.Finished() }

func (r *lriterator) _list(wi lrwi, smap *meta.Smap) error {
r.lrp = lrpList
for _, objName := range r.msg.ObjNames {
if r.done() {
break
}
lom := core.AllocLOM(objName)
err := r.do(lom, wi, smap)
Expand All @@ -119,33 +140,45 @@ func (r *lriterator) iterRange(smap *meta.Smap, pt *cos.ParsedTemplate, wi lrwi)
return nil
}

// compare with ais/plstcx.go (TODO: unify)
func (r *lriterator) iteratePrefix(smap *meta.Smap, prefix string, wi lrwi) error {
if err := cmn.ValidatePrefix(prefix); err != nil {
return err
func (r *lriterator) _range(wi lrwi, smap *meta.Smap) error {
r.pt.InitIter()
for objName, hasNext := r.pt.Next(); hasNext; objName, hasNext = r.pt.Next() {
if r.done() {
return nil
}
lom := core.AllocLOM(objName)
err := r.do(lom, wi, smap)
core.FreeLOM(lom)
if err != nil {
return err
}
}
return nil
}

// (compare with ais/plstcx)
func (r *lriterator) _prefix(wi lrwi, smap *meta.Smap) error {
var (
err error
lst *cmn.LsoResult
msg = &apc.LsoMsg{Prefix: prefix, Props: apc.GetPropsStatus}
bck = r.xctn.Bck()
npg = newNpgCtx(bck, msg, noopCb)
bremote = bck.IsRemote()
msg = &apc.LsoMsg{Prefix: r.prefix, Props: apc.GetPropsStatus}
npg = newNpgCtx(r.bck, msg, noopCb)
bremote = r.bck.IsRemote()
)
if err := bck.Init(core.T.Bowner()); err != nil {
if err := r.bck.Init(core.T.Bowner()); err != nil {
return err
}
if !bremote {
smap = nil // not needed
}
for {
if r.xctn.IsAborted() || r.xctn.Finished() {
if r.done() {
break
}
if bremote {
var errCode int
lst = &cmn.LsoResult{Entries: allocLsoEntries()}
errCode, err = core.T.Backend(bck).ListObjects(bck, msg, lst) // (TODO comment above)
errCode, err = core.T.Backend(r.bck).ListObjects(r.bck, msg, lst) // (TODO comment above)
if err != nil {
freeLsoEntries(lst.Entries)
if errCode == http.StatusNotFound && !cos.IsNotExist(err, 0) {
Expand All @@ -165,7 +198,7 @@ func (r *lriterator) iteratePrefix(smap *meta.Smap, prefix string, wi lrwi) erro
if !be.IsStatusOK() {
continue
}
if r.xctn.IsAborted() || r.xctn.Finished() {
if r.done() {
freeLsoEntries(lst.Entries)
return nil
}
Expand All @@ -188,24 +221,8 @@ func (r *lriterator) iteratePrefix(smap *meta.Smap, prefix string, wi lrwi) erro
return nil
}

func (r *lriterator) iterList(wi lrwi, smap *meta.Smap) error {
r.lrp = lrpList
for _, objName := range r.msg.ObjNames {
if r.xctn.IsAborted() || r.xctn.Finished() {
break
}
lom := core.AllocLOM(objName)
err := r.do(lom, wi, smap)
core.FreeLOM(lom)
if err != nil {
return err
}
}
return nil
}

func (r *lriterator) do(lom *core.LOM, wi lrwi, smap *meta.Smap) error {
if err := lom.InitBck(r.xctn.Bck().Bucket()); err != nil {
if err := lom.InitBck(r.bck.Bucket()); err != nil {
return err
}
if smap != nil {
Expand Down
29 changes: 14 additions & 15 deletions xact/xs/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ func (*prfFactory) New(args xreg.Args, bck *meta.Bck) xreg.Renewable {
return np
}

func (p *prfFactory) Start() error {
func (p *prfFactory) Start() (err error) {
b := p.Bck
if err := b.Init(core.T.Bowner()); err != nil {
if err = b.Init(core.T.Bowner()); err != nil {
return err
}
if b.IsAIS() {
return fmt.Errorf("bucket %s is not _remote_ (can only prefetch remote buckets)", b)
}
p.xctn = newPrefetch(&p.Args, p.Kind(), b, p.msg)
return nil
p.xctn, err = newPrefetch(&p.Args, p.Kind(), b, p.msg)
return err
}

func (*prfFactory) Kind() string { return apc.ActPrefetchObjects }
Expand All @@ -64,24 +64,23 @@ func (*prfFactory) WhenPrevIsRunning(xreg.Renewable) (xreg.WPR, error) {
return xreg.WprKeepAndStartNew, nil
}

func newPrefetch(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.PrefetchMsg) *prefetch {
r := &prefetch{config: cmn.GCO.Get(), msg: msg}
func newPrefetch(xargs *xreg.Args, kind string, bck *meta.Bck, msg *apc.PrefetchMsg) (r *prefetch, err error) {
r = &prefetch{config: cmn.GCO.Get(), msg: msg}

r.lriterator.init(r, &msg.ListRange)
err = r.lriterator.init(r, &msg.ListRange, bck)
if err != nil {
return nil, err
}
r.InitBase(xargs.UUID, kind, bck)
r.lriterator.xctn = r

r.latestVer = bck.VersionConf().ValidateWarmGet || msg.LatestVer
return r
return r, nil
}

func (r *prefetch) Run(wg *sync.WaitGroup) {
wg.Done()
smap := core.T.Sowner().Get()
if r.msg.IsList() {
_ = r.iterList(r, smap)
} else {
_ = r.rangeOrPref(r, smap)
err := r.lriterator.run(r, core.T.Sowner().Get())
if err != nil {
r.AddErr(err, 5, cos.SmoduleXs) // duplicated?
}
r.Finish()
}
Expand Down
7 changes: 2 additions & 5 deletions xact/xs/tcobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,8 @@ func (r *XactTCObjs) Run(wg *sync.WaitGroup) {
nat := smap.CountActiveTs()
wi.refc.Store(int32(nat - 1))

lrit.init(r, &msg.ListRange)
if msg.IsList() {
err = lrit.iterList(wi, smap)
} else {
err = lrit.rangeOrPref(wi, smap)
if err = lrit.init(r, &msg.ListRange, r.Bck()); err == nil {
err = lrit.run(wi, smap)
}
if r.IsAborted() || err != nil {
goto fin
Expand Down

0 comments on commit e679048

Please sign in to comment.