From 922e49a5a0bdd530606df7e061574d513ad9193e Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sun, 10 Dec 2023 12:58:47 -0500 Subject: [PATCH] erasure coding: remove redundant state; simplify Signed-off-by: Alex Aizman --- ais/tgtcp.go | 3 +- ais/tgtfcold.go | 2 +- ais/tgtobj.go | 4 +- api/apc/actmsg.go | 4 +- ec/ec.go | 8 ++- ec/getjogger.go | 130 ++++++++++++++++++++++--------------------- ec/getxaction.go | 12 ++-- ec/manager.go | 120 +++++++++------------------------------ ec/metafile.go | 6 +- ec/putjogger.go | 42 +++++++------- ec/putxaction.go | 8 +-- ec/respondxaction.go | 17 +++--- ec/xaction.go | 48 ++++++---------- reb/recv.go | 2 +- 14 files changed, 159 insertions(+), 247 deletions(-) diff --git a/ais/tgtcp.go b/ais/tgtcp.go index 9456ede5cb2..b12aed89f60 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -693,8 +693,7 @@ func (t *target) _postBMD(newBMD *bucketMD, tag string, rmbcks []*meta.Bck) { }(rmbcks...) } if tag != bmdReg { - // ecmanager will get updated BMD upon its init() - if err := ec.ECM.BucketsMDChanged(); err != nil { + if err := ec.ECM.BMDChanged(); err != nil { nlog.Errorf("Failed to initialize EC manager: %v", err) } } diff --git a/ais/tgtfcold.go b/ais/tgtfcold.go index bea1dd7b5b9..c82c218dd73 100644 --- a/ais/tgtfcold.go +++ b/ais/tgtfcold.go @@ -133,7 +133,7 @@ func (goi *getOI) coldSeek(res *cluster.GetReaderResult) error { } // make copies and slices (async) - if err = ec.ECM.EncodeObject(lom); err != nil && err != ec.ErrorECDisabled { + if err = ec.ECM.EncodeObject(lom, nil); err != nil && err != ec.ErrorECDisabled { nlog.Errorln(ftcg+"(ec)", lom, err) } t.putMirror(lom) diff --git a/ais/tgtobj.go b/ais/tgtobj.go index 2c6af9e13cd..bd653bc3a20 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -252,7 +252,7 @@ func (poi *putOI) finalize() (errCode int, err error) { return } if !poi.skipEC { - if ecErr := ec.ECM.EncodeObject(poi.lom); ecErr != nil && ecErr != ec.ErrorECDisabled { + if ecErr := ec.ECM.EncodeObject(poi.lom, nil); ecErr != nil && ecErr != ec.ErrorECDisabled { err = ecErr if cmn.IsErrCapExceeded(err) { errCode = http.StatusInsufficientStorage @@ -1742,7 +1742,7 @@ func (a *putA2I) finalize(size int64, cksum *cos.Cksum, fqn string) error { return err } if a.lom.Bprops().EC.Enabled { - if err := ec.ECM.EncodeObject(a.lom); err != nil && err != ec.ErrorECDisabled { + if err := ec.ECM.EncodeObject(a.lom, nil); err != nil && err != ec.ErrorECDisabled { return err } } diff --git a/api/apc/actmsg.go b/api/apc/actmsg.go index 9671570c415..8323d4c0555 100644 --- a/api/apc/actmsg.go +++ b/api/apc/actmsg.go @@ -22,8 +22,8 @@ const ( ActSummaryBck = "summary-bck" ActECEncode = "ec-encode" // erasure code a bucket - ActECGet = "ec-get" // erasure decode objects - ActECPut = "ec-put" // erasure encode objects + ActECGet = "ec-get" // read erasure coded objects + ActECPut = "ec-put" // erasure code objects ActECRespond = "ec-resp" // respond to other targets' EC requests ActCopyBck = "copy-bck" diff --git a/ec/ec.go b/ec/ec.go index 96f3089f5ca..86f2bb9aa5e 100644 --- a/ec/ec.go +++ b/ec/ec.go @@ -182,7 +182,8 @@ type ( type global struct { t cluster.Target reqPool sync.Pool - mm *memsys.MMSA // memory manager and slab/SGL allocator + pmm *memsys.MMSA // memory manager slab/SGL allocator (pages) + smm *memsys.MMSA // ditto, bytes emptyReq request } @@ -196,7 +197,8 @@ var ( func Init(t cluster.Target) { g.t = t - g.mm = t.PageMM() + g.pmm = t.PageMM() + g.smm = t.ByteMM() fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{}) fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{}) @@ -324,7 +326,7 @@ func useDisk(objSize int64, config *cmn.Config) bool { if config.EC.DiskOnly { return true } - memPressure := g.mm.Pressure() + memPressure := g.pmm.Pressure() switch memPressure { case memsys.OOM, memsys.PressureExtreme: return true diff --git a/ec/getjogger.go b/ec/getjogger.go index d4b3a14e3e9..0da1da17fcf 100644 --- a/ec/getjogger.go +++ b/ec/getjogger.go @@ -148,7 +148,7 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos if err := ctx.lom.Load(false /*cache it*/, false /*locked*/); err != nil { return err } - smap := c.parent.smap.Get() + smap := g.t.Sowner().Get() targets, err := smap.HrwTargetList(ctx.lom.Uname(), ctx.meta.Parity+1) if err != nil { return err @@ -157,7 +157,7 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos // Fill the list of daemonIDs that do not have replica daemons := make([]string, 0, len(targets)) for _, target := range targets { - if target.ID() == c.parent.si.ID() { + if target.ID() == g.t.SID() { continue } @@ -193,7 +193,7 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos // Reason: memsys.Reader does not provide access to internal memsys.SGL that must be freed cb := func(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) { if err != nil { - nlog.Errorf("%s failed to send %s to %v: %v", c.parent.t, ctx.lom, daemons, err) + nlog.Errorf("%s failed to send %s to %v: %v", g.t, ctx.lom, daemons, err) } freeObject(reader) } @@ -209,22 +209,21 @@ func (c *getJogger) copyMissingReplicas(ctx *restoreCtx, reader cos.ReadOpenClos func (c *getJogger) restoreReplicatedFromMemory(ctx *restoreCtx) error { var ( writer *memsys.SGL - mm = c.parent.t.ByteMM() ) // Try to read replica from targets one by one until the replica is downloaded for node := range ctx.nodes { uname := unique(node, ctx.lom.Bck(), ctx.lom.ObjName) - iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(mm) + iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(g.smm) - w := mm.NewSGL(cos.KiB) + w := g.smm.NewSGL(cos.KiB) if _, err := c.parent.readRemote(ctx.lom, node, uname, iReqBuf, w); err != nil { - nlog.Errorf("%s failed to read from %s", c.parent.t, node) + nlog.Errorf("%s failed to read from %s", g.t, node) w.Free() - mm.Free(iReqBuf) + g.smm.Free(iReqBuf) w = nil continue } - mm.Free(iReqBuf) + g.smm.Free(iReqBuf) if w.Size() != 0 { // A valid replica is found - break and do not free SGL writer = w @@ -264,7 +263,6 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error { var ( writer *os.File n int64 - mm = c.parent.t.ByteMM() ) // Try to read a replica from targets one by one until the replica is downloaded tmpFQN := fs.CSM.Gen(ctx.lom, fs.WorkfileType, "ec-restore-repl") @@ -277,9 +275,9 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error { nlog.Errorf("Failed to create file: %v", err) break } - iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(mm) + iReqBuf := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()).NewPack(g.smm) n, err = c.parent.readRemote(ctx.lom, node, uname, iReqBuf, w) - mm.Free(iReqBuf) + g.smm.Free(iReqBuf) if err == nil && n != 0 { // A valid replica is found - break and do close file handle @@ -314,10 +312,10 @@ func (c *getJogger) restoreReplicatedFromDisk(ctx *restoreCtx) error { b := cos.MustMarshal(ctx.meta) ctMeta := cluster.NewCTFromLOM(ctx.lom, fs.ECMetaType) - if err := ctMeta.Write(c.parent.t, bytes.NewReader(b), -1); err != nil { + if err := ctMeta.Write(g.t, bytes.NewReader(b), -1); err != nil { return err } - if _, exists := c.parent.t.Bowner().Get().Get(ctMeta.Bck()); !exists { + if _, exists := g.t.Bowner().Get().Get(ctMeta.Bck()); !exists { if errRm := cos.RemoveFile(ctMeta.FQN()); errRm != nil { nlog.Errorf("nested error: save restored replica -> remove metafile: %v", errRm) } @@ -370,7 +368,7 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error { } } else { writer = &slice{ - writer: g.mm.NewSGL(cos.KiB * 512), + writer: g.pmm.NewSGL(cos.KiB * 512), twg: wgSlices, } } @@ -385,8 +383,7 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error { iReq := newIntraReq(reqGet, ctx.meta, ctx.lom.Bck()) iReq.isSlice = true - mm := c.parent.t.ByteMM() - request := iReq.NewPack(mm) + request := iReq.NewPack(g.smm) hdr := transport.ObjHdr{ ObjName: ctx.lom.ObjName, Opaque: request, @@ -403,13 +400,13 @@ func (c *getJogger) requestSlices(ctx *restoreCtx) error { } if err := c.parent.sendByDaemonID(daemons, o, nil, true); err != nil { freeSlices(ctx.slices) - mm.Free(request) + g.smm.Free(request) return err } if wgSlices.WaitTimeout(c.parent.config.Timeout.SendFile.D()) { - nlog.Errorf("%s timed out waiting for %s slices", c.parent.t, ctx.lom) + nlog.Errorf("%s timed out waiting for %s slices", g.t, ctx.lom) } - mm.Free(request) + g.smm.Free(request) return nil } @@ -430,7 +427,7 @@ func newSliceWriter(ctx *restoreCtx, writers []io.Writer, restored []*slice, } restored[idx] = &slice{workFQN: fqn, n: sliceSize} } else { - sgl := g.mm.NewSGL(sliceSize) + sgl := g.pmm.NewSGL(sliceSize) restored[idx] = &slice{obj: sgl, n: sliceSize} if cksumType != cos.ChecksumNone { cksums[idx] = cos.NewCksumHash(cksumType) @@ -630,7 +627,7 @@ func (c *getJogger) emptyTargets(ctx *restoreCtx) ([]string, error) { nodeToID[v] = k } // Generate the list of targets that should have a slice. - smap := c.parent.smap.Get() + smap := g.t.Sowner().Get() targets, err := smap.HrwTargetList(ctx.lom.Uname(), sliceCnt+1) if err != nil { nlog.Warningln(err) @@ -638,7 +635,7 @@ func (c *getJogger) emptyTargets(ctx *restoreCtx) ([]string, error) { } empty := make([]string, 0, len(targets)) for _, t := range targets { - if t.ID() == c.parent.si.ID() { + if t.ID() == g.t.SID() { continue } if _, ok := nodeToID[t.ID()]; ok { @@ -719,14 +716,14 @@ func (c *getJogger) uploadRestoredSlices(ctx *restoreCtx, slices []*slice) error cb := func(daemonID string, s *slice) transport.ObjSentCB { return func(hdr *transport.ObjHdr, reader io.ReadCloser, _ any, err error) { if err != nil { - nlog.Errorf("%s failed to send %s to %v: %v", c.parent.t, ctx.lom, daemonID, err) + nlog.Errorf("%s failed to send %s to %v: %v", g.t, ctx.lom, daemonID, err) } s.free() } }(tid, sl) if err := c.parent.writeRemote([]string{tid}, ctx.lom, dataSrc, cb); err != nil { remoteErr = err - nlog.Errorf("%s failed to send slice %s[%d] to %s", c.parent.t, ctx.lom, sliceID, tid) + nlog.Errorf("%s failed to send slice %s[%d] to %s", g.t, ctx.lom, sliceID, tid) } } @@ -764,7 +761,7 @@ func (c *getJogger) restoreEncoded(ctx *restoreCtx) error { // Restore and save locally the main replica restored, err := c.restoreMainObj(ctx) if err != nil { - nlog.Errorf("%s failed to restore main object %s: %v", c.parent.t, ctx.lom, err) + nlog.Errorf("%s failed to restore main object %s: %v", g.t, ctx.lom, err) c.freeDownloaded(ctx) freeSlices(restored) return err @@ -820,54 +817,37 @@ func (c *getJogger) restore(ctx *restoreCtx) error { // nodes(with their EC metadata) that have the lastest object version func (c *getJogger) requestMeta(ctx *restoreCtx) error { var ( - tmap = c.parent.smap.Get().Tmap - wg = cos.NewLimitedWaitGroup(cmn.MaxBcastParallel(), 8) - mtx = &sync.Mutex{} - gen int64 - mdExists bool - ) - requestMeta := func(si *meta.Snode) { - defer wg.Done() - md, err := RequestECMeta(ctx.lom.Bucket(), ctx.lom.ObjName, si, c.client) - if err != nil { - if mdExists { - nlog.Errorf("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err) - } else if c.parent.config.FastV(4, cos.SmoduleEC) { - nlog.Infof("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err) - } - return - } + wg = cos.NewLimitedWaitGroup(cmn.MaxBcastParallel(), 8) + mtx = &sync.Mutex{} + tmap = g.t.Sowner().Get().Tmap + ctMeta = cluster.NewCTFromLOM(ctx.lom, fs.ECMetaType) - mtx.Lock() - ctx.nodes[si.ID()] = md - // Detect the metadata with the latest generation on the fly. - if md.Generation > gen { - gen = md.Generation - ctx.meta = md - } - mtx.Unlock() - } - - ctMeta := cluster.NewCTFromLOM(ctx.lom, fs.ECMetaType) - md, err := LoadMetadata(ctMeta.FQN()) - mdExists = err == nil && len(md.Daemons) != 0 + md, err = LoadMetadata(ctMeta.FQN()) + mdExists = err == nil && len(md.Daemons) != 0 + ) if mdExists { // Metafile exists and contains a list of targets - nodes := md.RemoteTargets(c.parent.t) + nodes := md.RemoteTargets() ctx.nodes = make(map[string]*Metadata, len(nodes)) for _, node := range nodes { wg.Add(1) - go requestMeta(node) + go func(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) { + ctx.requestMeta(si, c, mtx, mdExists) + wg.Done() + }(node, c, mtx, mdExists) } } else { - // Otherwise, send cluster-wide request + // Otherwise, broadcast ctx.nodes = make(map[string]*Metadata, len(tmap)) for _, node := range tmap { - if node.ID() == c.parent.si.ID() { + if node.ID() == g.t.SID() { continue } wg.Add(1) - go requestMeta(node) + go func(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) { + ctx.requestMeta(si, c, mtx, mdExists) + wg.Done() + }(node, c, mtx, mdExists) } } wg.Wait() @@ -879,12 +859,36 @@ func (c *getJogger) requestMeta(ctx *restoreCtx) error { // Cleanup: delete all metadatas with "obsolete" information for k, v := range ctx.nodes { - if v.Generation != gen { + if v.Generation != ctx.meta.Generation { nlog.Warningf("Target %s[slice id %d] old generation: %v == %v", - k, v.SliceID, v.Generation, gen) + k, v.SliceID, v.Generation, ctx.meta.Generation) delete(ctx.nodes, k) } } return nil } + +//////////////// +// restoreCtx // +//////////////// + +func (ctx *restoreCtx) requestMeta(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) { + md, err := RequestECMeta(ctx.lom.Bucket(), ctx.lom.ObjName, si, c.client) + if err != nil { + if mdExists { + nlog.Errorf("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err) + } else if c.parent.config.FastV(4, cos.SmoduleEC) { + nlog.Infof("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err) + } + return + } + + mtx.Lock() + ctx.nodes[si.ID()] = md + // Detect the metadata with the latest generation on the fly. + if ctx.meta == nil || md.Generation > ctx.meta.Generation { + ctx.meta = md + } + mtx.Unlock() +} diff --git a/ec/getxaction.go b/ec/getxaction.go index 1f25ac6b923..875dcea4f5a 100644 --- a/ec/getxaction.go +++ b/ec/getxaction.go @@ -87,15 +87,13 @@ func newGetXact(bck *cmn.Bck, mgr *Manager) *XactGet { var ( avail, disabled = fs.Get() totalPaths = len(avail) + len(disabled) - smap = g.t.Sowner() - si = g.t.Snode() config = cmn.GCO.Get() xctn = &XactGet{ - getJoggers: make(map[string]*getJogger, totalPaths), - xactECBase: newXactECBase(g.t, smap, si, config, bck, mgr), - xactReqBase: newXactReqECBase(), + getJoggers: make(map[string]*getJogger, totalPaths), } ) + xctn.xactECBase.init(config, bck, mgr) + xctn.xactReqBase.init() // create all runners but do not start them until Run is called for mpath := range avail { @@ -127,13 +125,13 @@ func (r *XactGet) DispatchResp(iReq intraReq, hdr *transport.ObjHdr, bck *meta.B r.dOwner.mtx.Unlock() if !ok { - err := fmt.Errorf("%s: no slice writer for %s (uname %s)", r.t, bck.Cname(objName), uname) + err := fmt.Errorf("%s: no slice writer for %s (uname %s)", g.t, bck.Cname(objName), uname) nlog.Errorln(err) r.AddErr(err) return } if err := _writerReceive(writer, iReq.exists, objAttrs, reader); err != nil { - err = fmt.Errorf("%s: failed to read %s replica: %w (uname %s)", r.t, bck.Cname(objName), err, uname) + err = fmt.Errorf("%s: failed to read %s replica: %w (uname %s)", g.t, bck.Cname(objName), err, uname) nlog.Errorln(err) r.AddErr(err) } diff --git a/ec/manager.go b/ec/manager.go index 4be4462e9cc..8cb9785103a 100644 --- a/ec/manager.go +++ b/ec/manager.go @@ -26,8 +26,7 @@ import ( ) type Manager struct { - smap *meta.Smap - bmd *meta.BMD // bmd owner + bmd *meta.BMD xacts map[string]*BckXacts // bckName -> xctn map, only ais buckets allowed, no naming collisions @@ -38,8 +37,7 @@ type Manager struct { reqBundle ratomic.Pointer[bundle.Streams] respBundle ratomic.Pointer[bundle.Streams] - targetCnt atomic.Int32 // atomic, to avoid races between read/write on smap - bundleEnabled atomic.Bool // to disable and enable on the fly + bundleEnabled atomic.Bool // to disable and enable on the fly mu sync.RWMutex } @@ -49,25 +47,17 @@ var ( errSkipped = errors.New("skipped") // CT is skipped due to EC unsupported for the content type ) -func initManager() error { - var ( - netReq, netResp = cmn.NetIntraControl, cmn.NetIntraData - sowner = g.t.Sowner() - smap = sowner.Get() - ) +func initManager() (err error) { ECM = &Manager{ - netReq: netReq, - netResp: netResp, - smap: smap, - targetCnt: *atomic.NewInt32(int32(smap.CountActiveTs())), - bmd: g.t.Bowner().Get(), - xacts: make(map[string]*BckXacts), + netReq: cmn.NetIntraControl, + netResp: cmn.NetIntraData, + bmd: g.t.Bowner().Get(), + xacts: make(map[string]*BckXacts), } - if ECM.bmd.IsECUsed() { - return ECM.initECBundles() + err = ECM.initECBundles() } - return nil + return err } func (mgr *Manager) req() *bundle.Streams { return mgr.reqBundle.Load() } @@ -111,9 +101,6 @@ func (mgr *Manager) initECBundles() error { mgr.reqBundle.Store(bundle.New(sowner, g.t.Snode(), client, reqSbArgs)) mgr.respBundle.Store(bundle.New(sowner, g.t.Snode(), client, respSbArgs)) - mgr.smap = sowner.Get() - mgr.targetCnt.Store(int32(mgr.smap.CountActiveTs())) - sowner.Listeners().Reg(mgr) return nil } @@ -121,7 +108,6 @@ func (mgr *Manager) closeECBundles() { if !mgr.bundleEnabled.CAS(true, false) { return } - g.t.Sowner().Listeners().Unreg(mgr) mgr.req().Close(false) mgr.resp().Close(false) transport.Unhandle(ReqStreamName) @@ -263,7 +249,7 @@ func (mgr *Manager) recvResponse(hdr *transport.ObjHdr, objReader io.Reader, err // - lom - object to encode // - intra - if true, it is internal request and has low priority // - cb - optional callback that is called after the object is encoded -func (mgr *Manager) EncodeObject(lom *cluster.LOM, cb ...cluster.OnFinishObj) error { +func (mgr *Manager) EncodeObject(lom *cluster.LOM, cb cluster.OnFinishObj) error { if !lom.Bprops().EC.Enabled { return ErrorECDisabled } @@ -271,14 +257,6 @@ func (mgr *Manager) EncodeObject(lom *cluster.LOM, cb ...cluster.OnFinishObj) er if err := cs.Err(); err != nil { return err } - isECCopy := IsECCopy(lom.SizeBytes(), &lom.Bprops().EC) - targetCnt := mgr.targetCnt.Load() - - // compromise: encoding a small object requires fewer targets - if required := lom.Bprops().EC.RequiredEncodeTargets(); !isECCopy && int(targetCnt) < required { - return fmt.Errorf("%v: %d targets required to erasure code %s (have %d)", - cmn.ErrNotEnoughTargets, required, lom, targetCnt) - } spec, _ := fs.CSM.FileSpec(lom.FQN) if spec != nil && !spec.PermToProcess() { return errSkipped @@ -286,9 +264,9 @@ func (mgr *Manager) EncodeObject(lom *cluster.LOM, cb ...cluster.OnFinishObj) er req := allocateReq(ActSplit, lom.LIF()) req.IsCopy = IsECCopy(lom.SizeBytes(), &lom.Bprops().EC) - if len(cb) != 0 { + if cb != nil { req.rebuild = true - req.Callback = cb[0] + req.Callback = cb } mgr.RestoreBckPutXact(lom.Bck()).encode(req, lom) @@ -313,12 +291,6 @@ func (mgr *Manager) RestoreObject(lom *cluster.LOM) error { if err := cs.Err(); err != nil { return err } - targetCnt := mgr.targetCnt.Load() - // NOTE: Restore replica object is done with GFN, safe to always abort. - if required := lom.Bprops().EC.RequiredRestoreTargets(); int(targetCnt) < required { - return fmt.Errorf("%v: %d targets required to EC-restore %s (have %d)", - cmn.ErrNotEnoughTargets, required, lom, targetCnt) - } debug.Assert(lom.Mountpath() != nil && lom.Mountpath().Path != "") req := allocateReq(ActRestore, lom.LIF()) @@ -344,27 +316,30 @@ func (mgr *Manager) enableBck(bck *meta.Bck) { mgr.RestoreBckPutXact(bck).EnableRequests() } -func (mgr *Manager) BucketsMDChanged() error { +func (mgr *Manager) BMDChanged() error { mgr.mu.Lock() - newBckMD := g.t.Bowner().Get() - oldBckMD := mgr.bmd - if newBckMD.Version <= mgr.bmd.Version { + newBMD := g.t.Bowner().Get() + oldBMD := mgr.bmd + if newBMD.Version <= mgr.bmd.Version { mgr.mu.Unlock() return nil } - mgr.bmd = newBckMD + mgr.bmd = newBMD mgr.mu.Unlock() - if newBckMD.IsECUsed() && !oldBckMD.IsECUsed() { + // globally + if newBMD.IsECUsed() && !oldBMD.IsECUsed() { if err := mgr.initECBundles(); err != nil { return err } - } else if !newBckMD.IsECUsed() && oldBckMD.IsECUsed() { + } else if !newBMD.IsECUsed() && oldBMD.IsECUsed() { mgr.closeECBundles() + return nil } - provider := apc.AIS - newBckMD.Range(&provider, nil, func(nbck *meta.Bck) bool { - oprops, ok := oldBckMD.Get(nbck) + + // by bucket + newBMD.Range(nil, nil, func(nbck *meta.Bck) bool { + oprops, ok := oldBMD.Get(nbck) if !ok { if nbck.Props.EC.Enabled { mgr.enableBck(nbck) @@ -381,48 +356,3 @@ func (mgr *Manager) BucketsMDChanged() error { }) return nil } - -func (mgr *Manager) ListenSmapChanged() { - smap := g.t.Sowner().Get() - if smap.Version <= mgr.smap.Version { - return - } - - mgr.smap = g.t.Sowner().Get() - targetCnt := mgr.smap.CountActiveTs() - mgr.targetCnt.Store(int32(targetCnt)) - - mgr.mu.Lock() - - // Manager is initialized before being registered for smap changes - // bckMD will be present at this point - // stopping relevant EC xactions which can't be satisfied with current number of targets - // respond xaction is never stopped as it should respond regardless of the other targets - provider := apc.AIS - mgr.bmd.Range(&provider, nil, func(bck *meta.Bck) bool { - bckName, bckProps := bck.Name, bck.Props - bckXacts := mgr.getBckXactsUnlocked(bckName) - if !bckProps.EC.Enabled { - return false - } - if required := bckProps.EC.RequiredEncodeTargets(); targetCnt < required { - nlog.Warningf("not enough targets for EC encoding for bucket %s; actual: %v, expected: %v", - bckName, targetCnt, required) - bckXacts.AbortPut() - } - // NOTE: this doesn't guarantee that present targets are sufficient to restore an object - // if one target was killed, and a new one joined, this condition will be satisfied even though - // slices of the object are not present on the new target - if required := bckProps.EC.RequiredRestoreTargets(); targetCnt < required { - nlog.Warningf("not enough targets for EC restoring for bucket %s; actual: %v, expected: %v", - bckName, targetCnt, required) - bckXacts.AbortGet() - } - return false - }) - - mgr.mu.Unlock() -} - -// implementing cluster.Slistener interface -func (*Manager) String() string { return "ecmanager" } diff --git a/ec/metafile.go b/ec/metafile.go index 6adf2333fed..030dffcb460 100644 --- a/ec/metafile.go +++ b/ec/metafile.go @@ -74,14 +74,14 @@ func MetaFromReader(reader io.Reader) (*Metadata, error) { // RemoteTargets returns list of Snodes that contain a slice or replica. // This target(`t`) is removed from the list. -func (md *Metadata) RemoteTargets(t cluster.Target) []*meta.Snode { +func (md *Metadata) RemoteTargets() []*meta.Snode { if len(md.Daemons) == 0 { return nil } nodes := make([]*meta.Snode, 0, len(md.Daemons)) - smap := t.Sowner().Get() + smap := g.t.Sowner().Get() for tid := range md.Daemons { - if tid == t.SID() { + if tid == g.t.SID() { continue } tsi := smap.GetTarget(tid) diff --git a/ec/putjogger.go b/ec/putjogger.go index b0e82e82273..e0b4995e5cb 100644 --- a/ec/putjogger.go +++ b/ec/putjogger.go @@ -133,7 +133,7 @@ func (c *putJogger) processRequest(req *request) { c.parent.stats.updateWaitTime(time.Since(req.tm)) req.tm = time.Now() if err = c.ec(req, lom); err != nil { - err = fmt.Errorf("%s: failed to %s %s: %w", c.parent.t, req.Action, lom.StringEx(), err) + err = fmt.Errorf("%s: failed to %s %s: %w", g.t, req.Action, lom.StringEx(), err) nlog.Errorln(err) c.parent.AddErr(err) } @@ -142,7 +142,7 @@ func (c *putJogger) processRequest(req *request) { func (c *putJogger) run(wg *sync.WaitGroup) { nlog.Infof("Started EC for mountpath: %s, bucket %s", c.mpath, c.parent.bck) defer wg.Done() - c.buffer, c.slab = g.mm.Alloc() + c.buffer, c.slab = g.pmm.Alloc() for { select { case req := <-c.putCh: @@ -211,29 +211,28 @@ func (c *putJogger) splitAndDistribute(ctx *encodeCtx) error { // calculates and stores data and parity slices func (c *putJogger) encode(req *request, lom *cluster.LOM) error { - var ( - cksumValue, cksumType string - ecConf = lom.Bprops().EC - ) if c.parent.config.FastV(4, cos.SmoduleEC) { - nlog.Infof("Encoding %q...", lom.FQN) + nlog.Infof("Encoding %q...", lom) } - if lom.Checksum() != nil { - cksumType, cksumValue = lom.Checksum().Get() - } - reqTargets := ecConf.ParitySlices + 1 + var ( + ecConf = lom.Bprops().EC + reqTargets = ecConf.ParitySlices + 1 + smap = g.t.Sowner().Get() + ) if !req.IsCopy { reqTargets += ecConf.DataSlices } - smap := c.parent.smap.Get() targetCnt := smap.CountActiveTs() if targetCnt < reqTargets { return fmt.Errorf("%v: given EC config (d=%d, p=%d), %d targets required to encode %s (have %d, %s)", cmn.ErrNotEnoughTargets, ecConf.DataSlices, ecConf.ParitySlices, reqTargets, lom, targetCnt, smap.StringEx()) } - ctMeta := cluster.NewCTFromLOM(lom, fs.ECMetaType) - generation := mono.NanoTime() + var ( + ctMeta = cluster.NewCTFromLOM(lom, fs.ECMetaType) + generation = mono.NanoTime() + cksumType, cksumValue = lom.Checksum().Get() + ) meta := &Metadata{ MDVersion: MDVersionLast, Generation: generation, @@ -243,7 +242,7 @@ func (c *putJogger) encode(req *request, lom *cluster.LOM) error { IsCopy: req.IsCopy, ObjCksum: cksumValue, CksumType: cksumType, - FullReplica: c.parent.t.SID(), + FullReplica: g.t.SID(), Daemons: make(cos.MapStrUint16, reqTargets), } @@ -277,10 +276,10 @@ func (c *putJogger) encode(req *request, lom *cluster.LOM) error { return err } metaBuf := bytes.NewReader(meta.NewPack()) - if err := ctMeta.Write(c.parent.t, metaBuf, -1); err != nil { + if err := ctMeta.Write(g.t, metaBuf, -1); err != nil { return err } - if _, exists := c.parent.t.Bowner().Get().Get(ctMeta.Bck()); !exists { + if _, exists := g.t.Bowner().Get().Get(ctMeta.Bck()); !exists { if errRm := cos.RemoveFile(ctMeta.FQN()); errRm != nil { nlog.Errorf("nested error: encode -> remove metafile: %v", errRm) } @@ -290,7 +289,7 @@ func (c *putJogger) encode(req *request, lom *cluster.LOM) error { } func (c *putJogger) ctSendCallback(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) { - c.parent.t.ByteMM().Free(hdr.Opaque) + g.smm.Free(hdr.Opaque) if err != nil { nlog.Errorf("failed to send o[%s]: %v", hdr.Cname(), err) } @@ -309,13 +308,12 @@ func (c *putJogger) cleanup(lom *cluster.LOM) error { } return err } - nodes := md.RemoteTargets(c.parent.t) + nodes := md.RemoteTargets() if err := cos.RemoveFile(ctMeta.FQN()); err != nil { return err } - mm := c.parent.t.ByteMM() - request := newIntraReq(reqDel, nil, lom.Bck()).NewPack(mm) + request := newIntraReq(reqDel, nil, lom.Bck()).NewPack(g.smm) o := transport.AllocSend() o.Hdr = transport.ObjHdr{ObjName: lom.ObjName, Opaque: request, Opcode: reqDel} o.Hdr.Bck.Copy(lom.Bucket()) @@ -364,7 +362,7 @@ func generateSlicesToMemory(ctx *encodeCtx) error { sliceWriters = make([]io.Writer, ctx.paritySlices) ) for i := 0; i < ctx.paritySlices; i++ { - writer := g.mm.NewSGL(initSize) + writer := g.pmm.NewSGL(initSize) ctx.slices[i+ctx.dataSlices] = &slice{obj: writer} if cksumType == cos.ChecksumNone { sliceWriters[i] = writer diff --git a/ec/putxaction.go b/ec/putxaction.go index d230e73e3b6..d2cb0f8253e 100644 --- a/ec/putxaction.go +++ b/ec/putxaction.go @@ -88,15 +88,13 @@ func newPutXact(bck *cmn.Bck, mgr *Manager) *XactPut { var ( avail, disabled = fs.Get() totalPaths = len(avail) + len(disabled) - smap = g.t.Sowner() - si = g.t.Snode() config = cmn.GCO.Get() xctn = &XactPut{ - putJoggers: make(map[string]*putJogger, totalPaths), - xactECBase: newXactECBase(g.t, smap, si, config, bck, mgr), - xactReqBase: newXactReqECBase(), + putJoggers: make(map[string]*putJogger, totalPaths), } ) + xctn.xactECBase.init(config, bck, mgr) + xctn.xactReqBase.init() // create all runners but do not start them until Run is called for mpath := range avail { diff --git a/ec/respondxaction.go b/ec/respondxaction.go index 65ddb41c3ba..8b91cb73149 100644 --- a/ec/respondxaction.go +++ b/ec/respondxaction.go @@ -72,12 +72,9 @@ func (p *rspFactory) Start() error { ///////////////// func newRespondXact(bck *cmn.Bck, mgr *Manager) *XactRespond { - var ( - config = cmn.GCO.Get() - smap = g.t.Sowner() - si = g.t.Snode() - ) - return &XactRespond{xactECBase: newXactECBase(g.t, smap, si, config, bck, mgr)} + xctn := &XactRespond{} + xctn.xactECBase.init(cmn.GCO.Get(), bck, mgr) + return xctn } func (r *XactRespond) Run(*sync.WaitGroup) { @@ -110,7 +107,7 @@ func (r *XactRespond) removeObjAndMeta(bck *meta.Bck, objName string) error { nlog.Infof("Delete request for %s", bck.Cname(objName)) } - ct, err := cluster.NewCTFromBO(bck.Bucket(), objName, r.t.Bowner(), fs.ECSliceType) + ct, err := cluster.NewCTFromBO(bck.Bucket(), objName, g.t.Bowner(), fs.ECSliceType) if err != nil { return err } @@ -146,7 +143,7 @@ func (r *XactRespond) trySendCT(iReq intraReq, hdr *transport.ObjHdr, bck *meta. nlog.Infof("Received request for slice %d of %s", iReq.meta.SliceID, objName) } if iReq.isSlice { - ct, err := cluster.NewCTFromBO(bck.Bucket(), objName, r.t.Bowner(), fs.ECSliceType) + ct, err := cluster.NewCTFromBO(bck.Bucket(), objName, g.t.Bowner(), fs.ECSliceType) if err != nil { return err } @@ -168,7 +165,7 @@ func (r *XactRespond) DispatchReq(iReq intraReq, hdr *transport.ObjHdr, bck *met case reqDel: // object cleanup request: delete replicas, slices and metafiles if err := r.removeObjAndMeta(bck, hdr.ObjName); err != nil { - err = fmt.Errorf("%s: failed to delete %s: %w", r.t, bck.Cname(hdr.ObjName), err) + err = fmt.Errorf("%s: failed to delete %s: %w", g.t, bck.Cname(hdr.ObjName), err) nlog.Errorln(err) r.AddErr(err) } @@ -199,7 +196,7 @@ func (r *XactRespond) DispatchResp(iReq intraReq, hdr *transport.ObjHdr, object meta = iReq.meta ) if meta == nil { - nlog.Errorf("%s: no metadata for %s", r.t, hdr.Cname()) + nlog.Errorf("%s: no metadata for %s", g.t, hdr.Cname()) return } diff --git a/ec/xaction.go b/ec/xaction.go index bda790a64b3..da4c0fc70ea 100644 --- a/ec/xaction.go +++ b/ec/xaction.go @@ -30,10 +30,6 @@ const ( type ( xactECBase struct { xact.DemandBase - t cluster.Target - - smap meta.Sowner // to get current cluster map - si *meta.Snode // target daemonInfo config *cmn.Config // config stats stats // EC statistics bck cmn.Bck // which bucket xctn belongs to @@ -67,30 +63,21 @@ type ( } ) +func (r *xactECBase) init(config *cmn.Config, bck *cmn.Bck, mgr *Manager) { + r.stats = stats{bck: *bck} + r.config = config + r.bck = *bck + r.dOwner = &dataOwner{slices: make(map[string]*slice, 10)} + r.mgr = mgr +} + ///////////////// // xactReqBase // ///////////////// -func newXactReqECBase() xactReqBase { - return xactReqBase{ - mpathReqCh: make(chan mpathReq, 1), - controlCh: make(chan RequestsControlMsg, 8), - } -} - -func newXactECBase(t cluster.Target, smap meta.Sowner, si *meta.Snode, config *cmn.Config, bck *cmn.Bck, mgr *Manager) xactECBase { - return xactECBase{ - t: t, - smap: smap, - si: si, - stats: stats{bck: *bck}, - config: config, - bck: *bck, - dOwner: &dataOwner{ - slices: make(map[string]*slice, 10), - }, - mgr: mgr, - } +func (r *xactReqBase) init() { + r.mpathReqCh = make(chan mpathReq, 1) + r.controlCh = make(chan RequestsControlMsg, 8) } // ClearRequests disables receiving new EC requests, they will be terminated with error @@ -194,7 +181,7 @@ func (r *xactECBase) dataResponse(act intraReqType, hdr *transport.ObjHdr, fqn s rHdr := transport.ObjHdr{ObjName: objName, ObjAttrs: objAttrs, Opcode: act} rHdr.Bck.Copy(bck.Bucket()) - rHdr.Opaque = ireq.NewPack(r.t.ByteMM()) + rHdr.Opaque = ireq.NewPack(g.smm) o := transport.AllocSend() o.Hdr, o.Callback = rHdr, r.sendCb @@ -205,7 +192,7 @@ func (r *xactECBase) dataResponse(act intraReqType, hdr *transport.ObjHdr, fqn s } func (r *xactECBase) sendCb(hdr *transport.ObjHdr, _ io.ReadCloser, _ any, err error) { - r.t.ByteMM().Free(hdr.Opaque) + g.smm.Free(hdr.Opaque) if err != nil { err = fmt.Errorf("failed to send %s: %w", hdr.Cname(), err) nlog.Errorln(err) @@ -229,7 +216,7 @@ func (r *xactECBase) sendByDaemonID(daemonIDs []string, o *transport.Obj, reader var ( err error nodes = meta.AllocNodes(len(daemonIDs)) - smap = r.smap.Get() + smap = g.t.Sowner().Get() ) for _, id := range daemonIDs { si, ok := smap.Tmap[id] @@ -342,8 +329,7 @@ func (r *xactECBase) writeRemote(daemonIDs []string, lom *cluster.LOM, src *data req := newIntraReq(src.reqType, src.metadata, lom.Bck()) req.isSlice = src.isSlice - mm := r.t.ByteMM() - putData := req.NewPack(mm) + putData := req.NewPack(g.smm) objAttrs := cmn.ObjAttrs{ Size: src.size, Ver: lom.Version(), @@ -367,7 +353,7 @@ func (r *xactECBase) writeRemote(daemonIDs []string, lom *cluster.LOM, src *data hdr.Bck.Copy(lom.Bucket()) oldCallback := cb cb = func(hdr *transport.ObjHdr, reader io.ReadCloser, arg any, err error) { - mm.Free(hdr.Opaque) + g.smm.Free(hdr.Opaque) if oldCallback != nil { oldCallback(hdr, reader, arg, err) } @@ -394,7 +380,7 @@ func _writerReceive(writer *slice, exists bool, objAttrs cmn.ObjAttrs, reader io return ErrorNotFound } - buf, slab := g.mm.Alloc() + buf, slab := g.pmm.Alloc() writer.n, err = io.CopyBuffer(writer.writer, reader, buf) writer.cksum = objAttrs.Cksum if writer.version == "" && objAttrs.Ver != "" { diff --git a/reb/recv.go b/reb/recv.go index ce0c154a792..f9e10652cf5 100644 --- a/reb/recv.go +++ b/reb/recv.go @@ -312,7 +312,7 @@ func (reb *Reb) receiveCT(req *stageNtfn, hdr *transport.ObjHdr, reader io.Reade } // Broadcast updated MD ntfnMD := stageNtfn{daemonID: reb.t.SID(), stage: rebStageTraverse, rebID: reb.rebID.Load(), md: req.md, action: rebActUpdateMD} - nodes := req.md.RemoteTargets(reb.t) + nodes := req.md.RemoteTargets() for _, tsi := range nodes { if moveTo != nil && moveTo.ID() == tsi.ID() { continue