Skip to content

Commit

Permalink
erasure coding: remove redundant state; simplify
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 10, 2023
1 parent 37101ed commit 922e49a
Show file tree
Hide file tree
Showing 14 changed files with 159 additions and 247 deletions.
3 changes: 1 addition & 2 deletions ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,7 @@ func (t *target) _postBMD(newBMD *bucketMD, tag string, rmbcks []*meta.Bck) {
}(rmbcks...)
}
if tag != bmdReg {
// ecmanager will get updated BMD upon its init()
if err := ec.ECM.BucketsMDChanged(); err != nil {
if err := ec.ECM.BMDChanged(); err != nil {
nlog.Errorf("Failed to initialize EC manager: %v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion ais/tgtfcold.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (goi *getOI) coldSeek(res *cluster.GetReaderResult) error {
}

// make copies and slices (async)
if err = ec.ECM.EncodeObject(lom); err != nil && err != ec.ErrorECDisabled {
if err = ec.ECM.EncodeObject(lom, nil); err != nil && err != ec.ErrorECDisabled {
nlog.Errorln(ftcg+"(ec)", lom, err)
}
t.putMirror(lom)
Expand Down
4 changes: 2 additions & 2 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (poi *putOI) finalize() (errCode int, err error) {
return
}
if !poi.skipEC {
if ecErr := ec.ECM.EncodeObject(poi.lom); ecErr != nil && ecErr != ec.ErrorECDisabled {
if ecErr := ec.ECM.EncodeObject(poi.lom, nil); ecErr != nil && ecErr != ec.ErrorECDisabled {
err = ecErr
if cmn.IsErrCapExceeded(err) {
errCode = http.StatusInsufficientStorage
Expand Down Expand Up @@ -1742,7 +1742,7 @@ func (a *putA2I) finalize(size int64, cksum *cos.Cksum, fqn string) error {
return err
}
if a.lom.Bprops().EC.Enabled {
if err := ec.ECM.EncodeObject(a.lom); err != nil && err != ec.ErrorECDisabled {
if err := ec.ECM.EncodeObject(a.lom, nil); err != nil && err != ec.ErrorECDisabled {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions api/apc/actmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ const (
ActSummaryBck = "summary-bck"

ActECEncode = "ec-encode" // erasure code a bucket
ActECGet = "ec-get" // erasure decode objects
ActECPut = "ec-put" // erasure encode objects
ActECGet = "ec-get" // read erasure coded objects
ActECPut = "ec-put" // erasure code objects
ActECRespond = "ec-resp" // respond to other targets' EC requests

ActCopyBck = "copy-bck"
Expand Down
8 changes: 5 additions & 3 deletions ec/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ type (
type global struct {
t cluster.Target
reqPool sync.Pool
mm *memsys.MMSA // memory manager and slab/SGL allocator
pmm *memsys.MMSA // memory manager slab/SGL allocator (pages)
smm *memsys.MMSA // ditto, bytes
emptyReq request
}

Expand All @@ -196,7 +197,8 @@ var (

func Init(t cluster.Target) {
g.t = t
g.mm = t.PageMM()
g.pmm = t.PageMM()
g.smm = t.ByteMM()

fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{})
fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{})
Expand Down Expand Up @@ -324,7 +326,7 @@ func useDisk(objSize int64, config *cmn.Config) bool {
if config.EC.DiskOnly {
return true
}
memPressure := g.mm.Pressure()
memPressure := g.pmm.Pressure()
switch memPressure {
case memsys.OOM, memsys.PressureExtreme:
return true
Expand Down
130 changes: 67 additions & 63 deletions ec/getjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos
if err := ctx.lom.Load(false /*cache it*/, false /*locked*/); err != nil {
return err
}
smap := c.parent.smap.Get()
smap := g.t.Sowner().Get()
targets, err := smap.HrwTargetList(ctx.lom.Uname(), ctx.meta.Parity+1)
if err != nil {
return err
Expand All @@ -157,7 +157,7 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos
// Fill the list of daemonIDs that do not have replica
daemons := make([]string, 0, len(targets))
for _, target := range targets {
if target.ID() == c.parent.si.ID() {
if target.ID() == g.t.SID() {
continue
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos
// Reason: memsys.Reader does not provide access to internal memsys.SGL that must be freed
cb := func(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) {
if err != nil {
nlog.Errorf("%s failed to send %s to %v: %v", c.parent.t, ctx.lom, daemons, err)
nlog.Errorf("%s failed to send %s to %v: %v", g.t, ctx.lom, daemons, err)
}
freeObject(reader)
}
Expand All @@ -209,22 +209,21 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos
func (c *getJogger) restoreReplicatedFromMemory(ctx *restoreCtx) error {
var (
writer *memsys.SGL
mm = c.parent.t.ByteMM()
)
// Try to read replica from targets one by one until the replica is downloaded
for node := range ctx.nodes {
uname := unique(node, ctx.lom.Bck(), ctx.lom.ObjName)
iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(mm)
iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(g.smm)

w := mm.NewSGL(cos.KiB)
w := g.smm.NewSGL(cos.KiB)
if _, err := c.parent.readRemote(ctx.lom, node, uname, iReqBuf, w); err != nil {
nlog.Errorf("%s failed to read from %s", c.parent.t, node)
nlog.Errorf("%s failed to read from %s", g.t, node)
w.Free()
mm.Free(iReqBuf)
g.smm.Free(iReqBuf)
w = nil
continue
}
mm.Free(iReqBuf)
g.smm.Free(iReqBuf)
if w.Size() != 0 {
// A valid replica is found - break and do not free SGL
writer = w
Expand Down Expand Up @@ -264,7 +263,6 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error {
var (
writer *os.File
n int64
mm = c.parent.t.ByteMM()
)
// Try to read a replica from targets one by one until the replica is downloaded
tmpFQN := fs.CSM.Gen(ctx.lom, fs.WorkfileType, "ec-restore-repl")
Expand All @@ -277,9 +275,9 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error {
nlog.Errorf("Failed to create file: %v", err)
break
}
iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(mm)
iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(g.smm)
n, err = c.parent.readRemote(ctx.lom, node, uname, iReqBuf, w)
mm.Free(iReqBuf)
g.smm.Free(iReqBuf)

if err == nil && n != 0 {
// A valid replica is found - break and do close file handle
Expand Down Expand Up @@ -314,10 +312,10 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error {

b := cos.MustMarshal(ctx.meta)
ctMeta := cluster.NewCTFromLOM(ctx.lom, fs.ECMetaType)
if err := ctMeta.Write(c.parent.t, bytes.NewReader(b), -1); err != nil {
if err := ctMeta.Write(g.t, bytes.NewReader(b), -1); err != nil {
return err
}
if _, exists := c.parent.t.Bowner().Get().Get(ctMeta.Bck()); !exists {
if _, exists := g.t.Bowner().Get().Get(ctMeta.Bck()); !exists {
if errRm := cos.RemoveFile(ctMeta.FQN()); errRm != nil {
nlog.Errorf("nested error: save restored replica -> remove metafile: %v", errRm)
}
Expand Down Expand Up @@ -370,7 +368,7 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error {
}
} else {
writer = &slice{
writer: g.mm.NewSGL(cos.KiB * 512),
writer: g.pmm.NewSGL(cos.KiB * 512),
twg: wgSlices,
}
}
Expand All @@ -385,8 +383,7 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error {

iReq := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck())
iReq.isSlice = true
mm := c.parent.t.ByteMM()
request := iReq.NewPack(mm)
request := iReq.NewPack(g.smm)
hdr := transport.ObjHdr{
ObjName: ctx.lom.ObjName,
Opaque: request,
Expand All @@ -403,13 +400,13 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error {
}
if err := c.parent.sendByDaemonID(daemons, o, nil, true); err != nil {
freeSlices(ctx.slices)
mm.Free(request)
g.smm.Free(request)
return err
}
if wgSlices.WaitTimeout(c.parent.config.Timeout.SendFile.D()) {
nlog.Errorf("%s timed out waiting for %s slices", c.parent.t, ctx.lom)
nlog.Errorf("%s timed out waiting for %s slices", g.t, ctx.lom)
}
mm.Free(request)
g.smm.Free(request)
return nil
}

Expand All @@ -430,7 +427,7 @@ func newSliceWriter(ctx *restoreCtx, writers []io.Writer, restored []*slice,
}
restored[idx] = &slice{workFQN: fqn, n: sliceSize}
} else {
sgl := g.mm.NewSGL(sliceSize)
sgl := g.pmm.NewSGL(sliceSize)
restored[idx] = &slice{obj: sgl, n: sliceSize}
if cksumType != cos.ChecksumNone {
cksums[idx] = cos.NewCksumHash(cksumType)
Expand Down Expand Up @@ -630,15 +627,15 @@ func (c *getJogger) emptyTargets(ctx *restoreCtx) ([]string, error) {
nodeToID[v] = k
}
// Generate the list of targets that should have a slice.
smap := c.parent.smap.Get()
smap := g.t.Sowner().Get()
targets, err := smap.HrwTargetList(ctx.lom.Uname(), sliceCnt+1)
if err != nil {
nlog.Warningln(err)
return nil, err
}
empty := make([]string, 0, len(targets))
for _, t := range targets {
if t.ID() == c.parent.si.ID() {
if t.ID() == g.t.SID() {
continue
}
if _, ok := nodeToID[t.ID()]; ok {
Expand Down Expand Up @@ -719,14 +716,14 @@ func (c *getJogger) uploadRestoredSlices(ctx *restoreCtx, slices []*slice) error
cb := func(daemonID string, s *slice) transport.ObjSentCB {
return func(hdr *transport.ObjHdr, reader io.ReadCloser, _ any, err error) {
if err != nil {
nlog.Errorf("%s failed to send %s to %v: %v", c.parent.t, ctx.lom, daemonID, err)
nlog.Errorf("%s failed to send %s to %v: %v", g.t, ctx.lom, daemonID, err)
}
s.free()
}
}(tid, sl)
if err := c.parent.writeRemote([]string{tid}, ctx.lom, dataSrc, cb); err != nil {
remoteErr = err
nlog.Errorf("%s failed to send slice %s[%d] to %s", c.parent.t, ctx.lom, sliceID, tid)
nlog.Errorf("%s failed to send slice %s[%d] to %s", g.t, ctx.lom, sliceID, tid)
}
}

Expand Down Expand Up @@ -764,7 +761,7 @@ func (c *getJogger) restoreEncoded(ctx *restoreCtx) error {
// Restore and save locally the main replica
restored, err := c.restoreMainObj(ctx)
if err != nil {
nlog.Errorf("%s failed to restore main object %s: %v", c.parent.t, ctx.lom, err)
nlog.Errorf("%s failed to restore main object %s: %v", g.t, ctx.lom, err)
c.freeDownloaded(ctx)
freeSlices(restored)
return err
Expand Down Expand Up @@ -820,54 +817,37 @@ func (c *getJogger) restore(ctx *restoreCtx) error {
// nodes(with their EC metadata) that have the lastest object version
func (c *getJogger) requestMeta(ctx *restoreCtx) error {
var (
tmap = c.parent.smap.Get().Tmap
wg = cos.NewLimitedWaitGroup(cmn.MaxBcastParallel(), 8)
mtx = &sync.Mutex{}
gen int64
mdExists bool
)
requestMeta := func(si *meta.Snode) {
defer wg.Done()
md, err := RequestECMeta(ctx.lom.Bucket(), ctx.lom.ObjName, si, c.client)
if err != nil {
if mdExists {
nlog.Errorf("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err)
} else if c.parent.config.FastV(4, cos.SmoduleEC) {
nlog.Infof("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err)
}
return
}
wg = cos.NewLimitedWaitGroup(cmn.MaxBcastParallel(), 8)
mtx = &sync.Mutex{}
tmap = g.t.Sowner().Get().Tmap
ctMeta = cluster.NewCTFromLOM(ctx.lom, fs.ECMetaType)

mtx.Lock()
ctx.nodes[si.ID()] = md
// Detect the metadata with the latest generation on the fly.
if md.Generation > gen {
gen = md.Generation
ctx.meta = md
}
mtx.Unlock()
}

ctMeta := cluster.NewCTFromLOM(ctx.lom, fs.ECMetaType)
md, err := LoadMetadata(ctMeta.FQN())
mdExists = err == nil && len(md.Daemons) != 0
md, err = LoadMetadata(ctMeta.FQN())
mdExists = err == nil && len(md.Daemons) != 0
)
if mdExists {
// Metafile exists and contains a list of targets
nodes := md.RemoteTargets(c.parent.t)
nodes := md.RemoteTargets()
ctx.nodes = make(map[string]*Metadata, len(nodes))
for _, node := range nodes {
wg.Add(1)
go requestMeta(node)
go func(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) {
ctx.requestMeta(si, c, mtx, mdExists)
wg.Done()
}(node, c, mtx, mdExists)
}
} else {
// Otherwise, send cluster-wide request
// Otherwise, broadcast
ctx.nodes = make(map[string]*Metadata, len(tmap))
for _, node := range tmap {
if node.ID() == c.parent.si.ID() {
if node.ID() == g.t.SID() {
continue
}
wg.Add(1)
go requestMeta(node)
go func(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) {
ctx.requestMeta(si, c, mtx, mdExists)
wg.Done()
}(node, c, mtx, mdExists)
}
}
wg.Wait()
Expand All @@ -879,12 +859,36 @@ func (c *getJogger) requestMeta(ctx *restoreCtx) error {

// Cleanup: delete all metadatas with "obsolete" information
for k, v := range ctx.nodes {
if v.Generation != gen {
if v.Generation != ctx.meta.Generation {
nlog.Warningf("Target %s[slice id %d] old generation: %v == %v",
k, v.SliceID, v.Generation, gen)
k, v.SliceID, v.Generation, ctx.meta.Generation)
delete(ctx.nodes, k)
}
}

return nil
}

////////////////
// restoreCtx //
////////////////

func (ctx *restoreCtx) requestMeta(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) {
md, err := RequestECMeta(ctx.lom.Bucket(), ctx.lom.ObjName, si, c.client)
if err != nil {
if mdExists {
nlog.Errorf("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err)
} else if c.parent.config.FastV(4, cos.SmoduleEC) {
nlog.Infof("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err)
}
return
}

mtx.Lock()
ctx.nodes[si.ID()] = md
// Detect the metadata with the latest generation on the fly.
if ctx.meta == nil || md.Generation > ctx.meta.Generation {
ctx.meta = md
}
mtx.Unlock()
}
Loading

0 comments on commit 922e49a

Please sign in to comment.