Skip to content

Commit

Permalink
amend metadata cache-flushing logic
Browse files Browse the repository at this point in the history
* (atime, prefetch, is-dirty)
* clarify; make it readable
* extract "terminating" case
* keep "dirty" longer but not forever

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 29, 2023
1 parent ca8f4b0 commit e8c4000
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 43 deletions.
5 changes: 4 additions & 1 deletion cmd/cli/teb/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
func fmtObjStatus(obj *cmn.LsoEntry) string {
switch obj.Status() {
case apc.LocOK:
return "ok"
if obj.IsPresent() {
return "ok"
}
return UnknownStatusVal
case apc.LocMisplacedNode:
return "misplaced(cluster)"
case apc.LocMisplacedMountpath:
Expand Down
109 changes: 69 additions & 40 deletions core/lcache_hk.go → core/lcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
Expand All @@ -21,23 +20,26 @@ import (
)

const (
oomEvictAtime = time.Minute * 5 // OOM
mpeEvictAtime = time.Minute * 10 // extreme
mphEvictAtime = time.Minute * 20 // high
mpnEvictAtime = time.Hour // normal
oomEvictAtime = time.Minute * 5 // OOM
mpeEvictAtime = time.Minute * 10 // extreme
mphEvictAtime = time.Minute * 20 // high
mpnEvictAtime = time.Hour // normal

iniEvictAtime = mpnEvictAtime / 2 // initial
maxEvictAtime = mpnEvictAtime * 2 // maximum
)

const termDuration = time.Duration(-1)

type lchk struct {
cache *sync.Map
now time.Time
d time.Duration
cache *sync.Map
// runtime
now time.Time
d time.Duration
totalCnt int64
// stats
evictedCnt int64
totalCnt int64
flushColdCnt int64
running atomic.Bool
// single entry
running atomic.Bool
}

func regLomCacheWithHK() {
Expand Down Expand Up @@ -100,6 +102,10 @@ func (lchk *lchk) housekeep() (d time.Duration) {
return
}

const termDuration = time.Duration(-1)

func (lchk *lchk) terminating() bool { return lchk.d == termDuration }

func (*lchk) mp() (d time.Duration, tag string) {
p := g.pmm.Pressure()
switch p {
Expand All @@ -126,12 +132,17 @@ func (lchk *lchk) evictAll(d time.Duration) {

// single-threaded: one cache at a time
caches := lomCaches()
if lchk.terminating() {
for _, cache := range caches {
lchk.cache = cache
cache.Range(lchk.fterm)
}
return
}

for _, cache := range caches {
lchk.cache = cache
cache.Range(lchk.f)
}
if d == termDuration {
return
cache.Range(lchk.frun)
}

if _, tag := lchk.mp(); tag != "" {
Expand All @@ -145,38 +156,56 @@ func (lchk *lchk) evictAll(d time.Duration) {
lchk.running.Store(false)
}

func (lchk *lchk) f(hkey, value any) bool {
func (lchk *lchk) fterm(_, value any) bool {
md := value.(*lmeta)
if md.Atime < 0 {
// prefetched, not yet accessed
lchk.flush(md, time.Unix(0, -md.Atime))
return true
}
if md.isDirty() || md.atimefs != uint64(md.Atime) {
lchk.flush(md, time.Unix(0, md.Atime))
}
return true
}

func (lchk *lchk) frun(hkey, value any) bool {
var (
md = value.(*lmeta)
mdTime = md.Atime
special bool
md = value.(*lmeta)
mdTime = md.Atime
)
lchk.totalCnt++

if mdTime < 0 {
mdTime = -mdTime // special case: prefetched but not yet accessed
special = true
// prefetched, not yet accessed
mdTime = -mdTime
}
lchk.totalCnt++
atime := time.Unix(0, mdTime)
if lchk.d != termDuration && lchk.now.Sub(atime) < lchk.d {
elapsed := lchk.now.Sub(atime)
if elapsed < lchk.d {
return true
}

atimefs := md.atimefs & ^lomDirtyMask
if special || (md.Atime > 0 && atimefs != uint64(md.Atime)) {
debug.Assert(cos.IsValidAtime(md.Atime), md.Atime)
lif := LIF{Uname: md.uname, BID: md.bckID}
lom, err := lif.LOM()
if err == nil {
lom.Lock(true)
lom.flushCold(md, atime)
lom.Unlock(true)
FreeLOM(lom)
lchk.flushColdCnt++
if md.isDirty() {
if lchk.d > mphEvictAtime && elapsed < maxEvictAtime {
return true
}
lchk.flush(md, atime)
} else if md.atimefs != uint64(md.Atime) {
lchk.flush(md, atime)
}
if lchk.d != termDuration {
lchk.cache.Delete(hkey)
lchk.evictedCnt++
}
lchk.cache.Delete(hkey)
lchk.evictedCnt++
return true
}

func (lchk *lchk) flush(md *lmeta, atime time.Time) {
lif := LIF{Uname: md.uname, BID: md.bckID}
lom, err := lif.LOM()
if err == nil {
lom.Lock(true)
lom.flushCold(md, atime)
lom.Unlock(true)
FreeLOM(lom)
lchk.flushColdCnt++
}
}
2 changes: 2 additions & 0 deletions core/lcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
// LOM copy management
//

const fmtNestedErr = "nested err: %v"

func (lom *LOM) whingeCopy() (yes bool) {
if !lom.IsCopy() {
return
Expand Down
File renamed without changes.
2 changes: 0 additions & 2 deletions core/lom.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import (
// that contains this LOM

const (
fmtNestedErr = "nested err: %v"
lomInitialVersion = "1"
lomDirtyMask = uint64(1 << 63)
)

// lcache stats
Expand Down
2 changes: 2 additions & 0 deletions core/lom_xattr.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ func _recomputeMdSize(size, mdSize int64) {
// lmeta //
///////////

const lomDirtyMask = uint64(1 << 63)

func (md *lmeta) makeDirty() { md.atimefs |= lomDirtyMask }
func (md *lmeta) clearDirty() { md.atimefs &= ^lomDirtyMask }
func (md *lmeta) isDirty() bool { return md.atimefs&lomDirtyMask == lomDirtyMask }
Expand Down

0 comments on commit e8c4000

Please sign in to comment.