From 81bc96b6413892404df6e6229353001612477f97 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Thu, 23 Jan 2025 13:50:10 -0500 Subject: [PATCH] add page iterator (LPI) for local pagination * part five, prev. commit: 81094664a9f8 Signed-off-by: Alex Aizman --- fs/fs.go | 7 ------- fs/lpi/allmps.go | 15 +++++++-------- fs/lpi/lpi.go | 48 +++++++++++++++++++++++++++++++----------------- fs/lpi_test.go | 4 ++-- 4 files changed, 40 insertions(+), 34 deletions(-) diff --git a/fs/fs.go b/fs/fs.go index 9611e836785..d5955b5e2b3 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -281,13 +281,6 @@ func (mi *Mountpath) MakePathCT(bck *cmn.Bck, contentType string) string { return cos.UnsafeS(buf) } -func (mi *Mountpath) MakePathPrefix(bck *cmn.Bck, contentType, prefix string) string { - if prefix == "" { - return mi.MakePathCT(bck, contentType) - } - return mi.MakePathFQN(bck, contentType, prefix) -} - func (mi *Mountpath) MakePathFQN(bck *cmn.Bck, contentType, objName string) string { debug.Assert(contentType != "") debug.Assert(objName != "") diff --git a/fs/lpi/allmps.go b/fs/lpi/allmps.go index 4aa6cd56968..e29a95dc825 100644 --- a/fs/lpi/allmps.go +++ b/fs/lpi/allmps.go @@ -34,7 +34,7 @@ func (lpis *Lpis) Init(bck *cmn.Bck, prefix string) { lpis.bck = bck } for _, mi := range avail { - it, err := New(mi.MakePathPrefix(bck, fs.ObjectType, prefix)) + it, err := New(mi.MakePathCT(bck, fs.ObjectType) /*root*/, prefix) debug.AssertNoErr(err) milpi := milpi{ page: make(Page), @@ -45,8 +45,7 @@ func (lpis *Lpis) Init(bck *cmn.Bck, prefix string) { } } -// TODO: consider jogger-per-mountpath - +// TODO: consider jogger-per-mountpath approach func (lpis *Lpis) Do(lastPage cmn.LsoEntries, outPage *cmn.LsoRes, tag string) { var ( lastName string @@ -64,7 +63,7 @@ func (lpis *Lpis) Do(lastPage cmn.LsoEntries, outPage *cmn.LsoRes, tag string) { continue } if lastName != "" { - eop = milpi.mi.MakePathPrefix(lpis.bck, fs.ObjectType, lastName) + eop = milpi.mi.MakePathFQN(lpis.bck, fs.ObjectType, lastName) } // next local page "until" @@ -76,20 +75,20 @@ func (lpis *Lpis) Do(lastPage cmn.LsoEntries, outPage *cmn.LsoRes, tag string) { } } - // 2. last page as a map + // 2. last page as a (mem-pooled) map lastPageMap := allocPage() for _, en := range lastPage { - lastPageMap[en.Name] = struct{}{} + lastPageMap[en.Name] = 0 } // 3. find and add 'remotely-deleted' for _, milpi := range lpis.a { - for lname := range milpi.page { + for lname, size := range milpi.page { if _, ok := lastPageMap[lname]; ok { delete(milpi.page, lname) continue } - en := &cmn.LsoEnt{Name: lname} + en := &cmn.LsoEnt{Name: lname, Size: size} en.SetFlag(apc.EntryVerRemoved | apc.EntryIsCached) outPage.Entries = append(outPage.Entries, en) } diff --git a/fs/lpi/lpi.go b/fs/lpi/lpi.go index 786018535c2..b8e417d6151 100644 --- a/fs/lpi/lpi.go +++ b/fs/lpi/lpi.go @@ -11,6 +11,7 @@ import ( "path/filepath" "strings" + "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" "github.com/NVIDIA/aistore/cmn/nlog" @@ -25,7 +26,7 @@ const ( ) type ( - Page map[string]struct{} + Page map[string]int64 // [name => size] TODO: [name => lom] for props Msg struct { EOP string // until end-of-page marker @@ -35,14 +36,16 @@ type ( // local page iterator (LPI) // NOTE: it is caller's responsibility to serialize access _or_ take locks. Iter struct { - page Page - root string + page Page + root string + prefix string // runtime current string next string msg Msg lr int + lp int } ) @@ -50,7 +53,7 @@ var ( errStop = errors.New("stop") ) -func New(root string) (*Iter, error) { +func New(root, prefix string) (*Iter, error) { finfo, err := os.Stat(root) if err != nil { return nil, fmt.Errorf("root fstat: %v", err) @@ -59,10 +62,11 @@ func New(root string) (*Iter, error) { return nil, fmt.Errorf("root is not a directory: %s", root) } - lpi := &Iter{root: root} + lpi := &Iter{root: root, prefix: prefix} { lpi.next = lpi.root lpi.lr = len(lpi.root) + lpi.lp = len(lpi.prefix) } debug.Assert(lpi.lr > 1 && !cos.IsLastB(lpi.root, filepath.Separator), lpi.root) @@ -85,19 +89,16 @@ func (lpi *Iter) Next(msg Msg, out Page) error { if lpi.current > lpi.msg.EOP { return fmt.Errorf("expected (end-of-page) %q > %q (current)", lpi.msg.EOP, lpi.current) } - if _, err := os.Stat(lpi.msg.EOP); err != nil { - return fmt.Errorf("end-of-page fstat: %v", err) - } } - // do page + // next page err := godirwalk.Walk(lpi.root, &godirwalk.Options{ Unsorted: false, Callback: lpi.Callback, ErrorCallback: lpi.ErrorCallback, }) if err != nil && err != errStop { - return fmt.Errorf("error during walk: %v", err) + return fmt.Errorf("error gowalk-ing: %v", err) } return nil @@ -125,19 +126,32 @@ func (lpi *Iter) Callback(pathname string, de *godirwalk.Dirent) (err error) { case pathname < lpi.current: // skip case pathname >= lpi.current && (pathname <= lpi.msg.EOP || lpi.msg.EOP == AllPages): + // reached page size if lpi.msg.Size != 0 && len(lpi.page) >= lpi.msg.Size { lpi.next = pathname err = errStop break } - // add + rel := pathname[lpi.lr+1:] + debug.AssertFunc(func() bool { _, ok := lpi.page[rel]; return !ok }) + + if lpi.lp > 0 && len(rel) >= lpi.lp { + if s := rel[0:lpi.lp]; s != lpi.prefix { // TODO: refine + // skip + if s > lpi.prefix { + return filepath.SkipDir + } + break + } + } - debug.AssertFunc(func() bool { - _, ok := lpi.page[rel] - return !ok - }) - lpi.page[rel] = struct{}{} + // add + if finfo, e := os.Stat(pathname); e == nil { // TODO: lom.Load() instead + lpi.page[rel] = finfo.Size() + } else if cmn.Rom.FastV(4, cos.SmoduleXs) { + nlog.Warningln(e) + } default: // next debug.Assert(pathname > lpi.msg.EOP && lpi.msg.EOP != AllPages) @@ -150,7 +164,7 @@ func (lpi *Iter) Callback(pathname string, de *godirwalk.Dirent) (err error) { func (*Iter) ErrorCallback(pathname string, err error) godirwalk.ErrorAction { if err != errStop { - nlog.Warningf("Error accessing %s: %v", pathname, err) + nlog.Warningln("Error accessing", pathname, err) } return godirwalk.Halt } diff --git a/fs/lpi_test.go b/fs/lpi_test.go index 8f37366401e..56a42e59388 100644 --- a/fs/lpi_test.go +++ b/fs/lpi_test.go @@ -71,7 +71,7 @@ func lpiPageSize(t *testing.T, root string, eops []string, lpiTestPageSize, tota msg = lpi.Msg{Size: lpiTestPageSize} num int ) - it, err := lpi.New(root) + it, err := lpi.New(root, "" /*prefix*/) // TODO: add test tassert.CheckFatal(t, err) for { @@ -102,7 +102,7 @@ func lpiEndOfPage(t *testing.T, root string, eops []string, total int) { previous string num int page = make(lpi.Page, 100) - it, err = lpi.New(root) + it, err = lpi.New(root, "" /*prefix*/) // TODO: add test ) tassert.CheckFatal(t, err)