Skip to content

Commit

Permalink
add page iterator (LPI) for local pagination
Browse files Browse the repository at this point in the history
* part five, prev. commit: 8109466

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 23, 2025
1 parent e39d6e2 commit 81bc96b
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 34 deletions.
7 changes: 0 additions & 7 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,6 @@ func (mi *Mountpath) MakePathCT(bck *cmn.Bck, contentType string) string {
return cos.UnsafeS(buf)
}

func (mi *Mountpath) MakePathPrefix(bck *cmn.Bck, contentType, prefix string) string {
if prefix == "" {
return mi.MakePathCT(bck, contentType)
}
return mi.MakePathFQN(bck, contentType, prefix)
}

func (mi *Mountpath) MakePathFQN(bck *cmn.Bck, contentType, objName string) string {
debug.Assert(contentType != "")
debug.Assert(objName != "")
Expand Down
15 changes: 7 additions & 8 deletions fs/lpi/allmps.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (lpis *Lpis) Init(bck *cmn.Bck, prefix string) {
lpis.bck = bck
}
for _, mi := range avail {
it, err := New(mi.MakePathPrefix(bck, fs.ObjectType, prefix))
it, err := New(mi.MakePathCT(bck, fs.ObjectType) /*root*/, prefix)
debug.AssertNoErr(err)
milpi := milpi{
page: make(Page),
Expand All @@ -45,8 +45,7 @@ func (lpis *Lpis) Init(bck *cmn.Bck, prefix string) {
}
}

// TODO: consider jogger-per-mountpath

// TODO: consider jogger-per-mountpath approach
func (lpis *Lpis) Do(lastPage cmn.LsoEntries, outPage *cmn.LsoRes, tag string) {
var (
lastName string
Expand All @@ -64,7 +63,7 @@ func (lpis *Lpis) Do(lastPage cmn.LsoEntries, outPage *cmn.LsoRes, tag string) {
continue
}
if lastName != "" {
eop = milpi.mi.MakePathPrefix(lpis.bck, fs.ObjectType, lastName)
eop = milpi.mi.MakePathFQN(lpis.bck, fs.ObjectType, lastName)
}

// next local page "until"
Expand All @@ -76,20 +75,20 @@ func (lpis *Lpis) Do(lastPage cmn.LsoEntries, outPage *cmn.LsoRes, tag string) {
}
}

// 2. last page as a map
// 2. last page as a (mem-pooled) map
lastPageMap := allocPage()
for _, en := range lastPage {
lastPageMap[en.Name] = struct{}{}
lastPageMap[en.Name] = 0
}

// 3. find and add 'remotely-deleted'
for _, milpi := range lpis.a {
for lname := range milpi.page {
for lname, size := range milpi.page {
if _, ok := lastPageMap[lname]; ok {
delete(milpi.page, lname)
continue
}
en := &cmn.LsoEnt{Name: lname}
en := &cmn.LsoEnt{Name: lname, Size: size}
en.SetFlag(apc.EntryVerRemoved | apc.EntryIsCached)
outPage.Entries = append(outPage.Entries, en)
}
Expand Down
48 changes: 31 additions & 17 deletions fs/lpi/lpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"strings"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
Expand All @@ -25,7 +26,7 @@ const (
)

type (
Page map[string]struct{}
Page map[string]int64 // [name => size] TODO: [name => lom] for props

Msg struct {
EOP string // until end-of-page marker
Expand All @@ -35,22 +36,24 @@ type (
// local page iterator (LPI)
// NOTE: it is caller's responsibility to serialize access _or_ take locks.
Iter struct {
page Page
root string
page Page
root string
prefix string

// runtime
current string
next string
msg Msg
lr int
lp int
}
)

var (
errStop = errors.New("stop")
)

func New(root string) (*Iter, error) {
func New(root, prefix string) (*Iter, error) {
finfo, err := os.Stat(root)
if err != nil {
return nil, fmt.Errorf("root fstat: %v", err)
Expand All @@ -59,10 +62,11 @@ func New(root string) (*Iter, error) {
return nil, fmt.Errorf("root is not a directory: %s", root)
}

lpi := &Iter{root: root}
lpi := &Iter{root: root, prefix: prefix}
{
lpi.next = lpi.root
lpi.lr = len(lpi.root)
lpi.lp = len(lpi.prefix)
}
debug.Assert(lpi.lr > 1 && !cos.IsLastB(lpi.root, filepath.Separator), lpi.root)

Expand All @@ -85,19 +89,16 @@ func (lpi *Iter) Next(msg Msg, out Page) error {
if lpi.current > lpi.msg.EOP {
return fmt.Errorf("expected (end-of-page) %q > %q (current)", lpi.msg.EOP, lpi.current)
}
if _, err := os.Stat(lpi.msg.EOP); err != nil {
return fmt.Errorf("end-of-page fstat: %v", err)
}
}

// do page
// next page
err := godirwalk.Walk(lpi.root, &godirwalk.Options{
Unsorted: false,
Callback: lpi.Callback,
ErrorCallback: lpi.ErrorCallback,
})
if err != nil && err != errStop {
return fmt.Errorf("error during walk: %v", err)
return fmt.Errorf("error gowalk-ing: %v", err)
}

return nil
Expand Down Expand Up @@ -125,19 +126,32 @@ func (lpi *Iter) Callback(pathname string, de *godirwalk.Dirent) (err error) {
case pathname < lpi.current:
// skip
case pathname >= lpi.current && (pathname <= lpi.msg.EOP || lpi.msg.EOP == AllPages):
// reached page size
if lpi.msg.Size != 0 && len(lpi.page) >= lpi.msg.Size {
lpi.next = pathname
err = errStop
break
}
// add

rel := pathname[lpi.lr+1:]
debug.AssertFunc(func() bool { _, ok := lpi.page[rel]; return !ok })

if lpi.lp > 0 && len(rel) >= lpi.lp {
if s := rel[0:lpi.lp]; s != lpi.prefix { // TODO: refine
// skip
if s > lpi.prefix {
return filepath.SkipDir
}
break
}
}

debug.AssertFunc(func() bool {
_, ok := lpi.page[rel]
return !ok
})
lpi.page[rel] = struct{}{}
// add
if finfo, e := os.Stat(pathname); e == nil { // TODO: lom.Load() instead
lpi.page[rel] = finfo.Size()
} else if cmn.Rom.FastV(4, cos.SmoduleXs) {
nlog.Warningln(e)
}
default:
// next
debug.Assert(pathname > lpi.msg.EOP && lpi.msg.EOP != AllPages)
Expand All @@ -150,7 +164,7 @@ func (lpi *Iter) Callback(pathname string, de *godirwalk.Dirent) (err error) {

func (*Iter) ErrorCallback(pathname string, err error) godirwalk.ErrorAction {
if err != errStop {
nlog.Warningf("Error accessing %s: %v", pathname, err)
nlog.Warningln("Error accessing", pathname, err)
}
return godirwalk.Halt
}
4 changes: 2 additions & 2 deletions fs/lpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func lpiPageSize(t *testing.T, root string, eops []string, lpiTestPageSize, tota
msg = lpi.Msg{Size: lpiTestPageSize}
num int
)
it, err := lpi.New(root)
it, err := lpi.New(root, "" /*prefix*/) // TODO: add test
tassert.CheckFatal(t, err)

for {
Expand Down Expand Up @@ -102,7 +102,7 @@ func lpiEndOfPage(t *testing.T, root string, eops []string, total int) {
previous string
num int
page = make(lpi.Page, 100)
it, err = lpi.New(root)
it, err = lpi.New(root, "" /*prefix*/) // TODO: add test
)
tassert.CheckFatal(t, err)

Expand Down

0 comments on commit 81bc96b

Please sign in to comment.