Skip to content

Commit

Permalink
core: intra-cluster notifications - add 'aborted'
Browse files Browse the repository at this point in the history
* extend notification message: add 'aborted'
* motivation: xaction snap (stats) is optional and may not carry the bit
* with minor refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 6, 2024
1 parent a1acac2 commit db1b43e
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 127 deletions.
13 changes: 8 additions & 5 deletions ais/htrun.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package ais provides core functionality for the AIStore object storage.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package ais

Expand Down Expand Up @@ -665,14 +665,16 @@ func (h *htrun) call(args *callArgs, smap *smapX) (res *callResult) {
// intra-cluster IPC, control plane: notify another node
//

func (h *htrun) notifyTerm(n core.Notif, err error) { h._nfy(n, err, apc.Finished) }
func (h *htrun) notifyProgress(n core.Notif) { h._nfy(n, nil, apc.Progress) }
func (h *htrun) notifyTerm(n core.Notif, err error, aborted bool) {
h._nfy(n, err, apc.Finished, aborted)
}
func (h *htrun) notifyProgress(n core.Notif) { h._nfy(n, nil, apc.Progress, false) }

func (h *htrun) _nfy(n core.Notif, err error, upon string) {
func (h *htrun) _nfy(n core.Notif, err error, upon string, aborted bool) {
var (
smap = h.owner.smap.get()
dsts = n.Subscribers()
msg = n.ToNotifMsg()
msg = n.ToNotifMsg(aborted)
args = allocBcArgs()
nodes = args.selected
)
Expand All @@ -695,6 +697,7 @@ func (h *htrun) _nfy(n core.Notif, err error, upon string) {
}
if err != nil {
msg.ErrMsg = err.Error()
msg.AbortedX = aborted
}
msg.NodeID = h.si.ID()
if len(nodes) == 0 {
Expand Down
58 changes: 26 additions & 32 deletions ais/prxnotif.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package ais provides core functionality for the AIStore object storage.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package ais

Expand Down Expand Up @@ -101,7 +101,6 @@ func (n *notifs) handler(w http.ResponseWriter, r *http.Request) {
var (
notifMsg = &core.NotifMsg{}
nl nl.Listener
errMsg error
uuid string
tid = r.Header.Get(apc.HdrCallerID) // sender node ID
)
Expand Down Expand Up @@ -152,57 +151,52 @@ func (n *notifs) handler(w http.ResponseWriter, r *http.Request) {
}
nl.RUnlock()

if notifMsg.ErrMsg != "" {
errMsg = errors.New(notifMsg.ErrMsg)
}

// NOTE: Default case is not required - will reach here only for valid types.
switch apiItems[0] {
// TODO: implement on Started notification
case apc.Progress:
err = n.handleProgress(nl, tsi, notifMsg.Data, errMsg)
nl.Lock()
n._progress(nl, tsi, notifMsg)
nl.Unlock()
case apc.Finished:
err = n.handleFinished(nl, tsi, notifMsg.Data, errMsg)
}

if err != nil {
n.p.writeErr(w, r, err)
}
n._finished(nl, tsi, notifMsg)
} // default not needed - cannot happen
}

func (*notifs) handleProgress(nl nl.Listener, tsi *meta.Snode, data []byte, srcErr error) (err error) {
nl.Lock()
defer nl.Unlock()

if srcErr != nil {
nl.AddErr(srcErr)
func (*notifs) _progress(nl nl.Listener, tsi *meta.Snode, msg *core.NotifMsg) {
if msg.ErrMsg != "" {
nl.AddErr(errors.New(msg.ErrMsg))
}
if data != nil {
stats, _, _, err := nl.UnmarshalStats(data)
// when defined, `data must be valid encoded stats
if msg.Data != nil {
stats, _, _, err := nl.UnmarshalStats(msg.Data)
debug.AssertNoErr(err)
nl.SetStats(tsi.ID(), stats)
}
return
}

func (n *notifs) handleFinished(nl nl.Listener, tsi *meta.Snode, data []byte, srcErr error) (err error) {
func (n *notifs) _finished(nl nl.Listener, tsi *meta.Snode, msg *core.NotifMsg) {
var (
stats any
aborted bool
srcErr error
done bool
aborted = msg.AbortedX
)
nl.Lock()
// data can either be `nil` or a valid encoded stats
if data != nil {
stats, _, aborted, err = nl.UnmarshalStats(data)
if msg.Data != nil {
// ditto
stats, _, abortedSnap, err := nl.UnmarshalStats(msg.Data)
debug.AssertNoErr(err)
nl.SetStats(tsi.ID(), stats)
debug.Assertf(abortedSnap == msg.AbortedX, "%s: %t vs %t [%s]", msg, abortedSnap, msg.AbortedX, nl.String())
aborted = aborted || abortedSnap
}
if msg.ErrMsg != "" {
srcErr = errors.New(msg.ErrMsg)
}
done := n.markFinished(nl, tsi, srcErr, aborted)
done = n.markFinished(nl, tsi, srcErr, aborted)
nl.Unlock()

if done {
n.done(nl)
}
return
}

// start listening
Expand Down
41 changes: 21 additions & 20 deletions ais/prxnotif_internal_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Package ais provides core functionality for the AIStore object storage.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package ais

import (
"bytes"
"errors"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -122,10 +121,11 @@ var _ = Describe("Notifications xaction test", func() {
return
}

notifRequest = func(daeID, xid, notifKind string, stats any) *http.Request {
notifRequest = func(daeID, xid, notifKind string, stats *core.Snap) *http.Request {
nm := core.NotifMsg{
UUID: xid,
Data: cos.MustMarshal(stats),
UUID: xid,
Data: cos.MustMarshal(stats),
AbortedX: stats.AbortedX,
}
body := bytes.NewBuffer(cos.MustMarshal(nm))
req := httptest.NewRequest(http.MethodPost, apc.URLPathNotifs.Join(notifKind), body)
Expand Down Expand Up @@ -161,37 +161,36 @@ var _ = Describe("Notifications xaction test", func() {
It("should add node to finished set on receiving finished stats", func() {
Expect(nl.FinCount()).To(BeEquivalentTo(0))
snap := finishedXact(xid)
err := n.handleFinished(nl, targets[target1ID], cos.MustMarshal(snap), nil)
Expect(err).To(BeNil())
msg := &core.NotifMsg{Data: cos.MustMarshal(snap)}
n._finished(nl, targets[target1ID], msg)
Expect(nl.ActiveNotifiers().Contains(target1ID)).To(BeFalse())
Expect(nl.Finished()).To(BeFalse())
})

It("should set error when source sends an error message", func() {
Expect(nl.Err()).To(BeNil())
snap := finishedXact(xid)
srcErr := errors.New("some error")
err := n.handleFinished(nl, targets[target1ID], cos.MustMarshal(snap), srcErr)
Expect(err).To(BeNil())
Expect(srcErr.Error()).To(BeEquivalentTo(nl.Err().Error()))
msg := &core.NotifMsg{Data: cos.MustMarshal(snap), ErrMsg: "some error"}
n._finished(nl, targets[target1ID], msg)
Expect(msg.ErrMsg).To(BeEquivalentTo(nl.Err().Error()))
Expect(nl.ActiveNotifiers().Contains(target1ID)).To(BeFalse())
})

It("should finish when all the Notifiers finished", func() {
Expect(nl.FinCount()).To(BeEquivalentTo(0))
n.add(nl)
snap := finishedXact(xid)
n.handleFinished(nl, targets[target1ID], cos.MustMarshal(snap), nil)
err := n.handleFinished(nl, targets[target2ID], cos.MustMarshal(snap), nil)
Expect(err).To(BeNil())
msg := &core.NotifMsg{Data: cos.MustMarshal(snap)}
n._finished(nl, targets[target1ID], msg)
n._finished(nl, targets[target2ID], msg)
Expect(nl.FinCount()).To(BeEquivalentTo(len(targets)))
Expect(nl.Finished()).To(BeTrue())
})

It("should be done if xaction Aborted", func() {
snap := abortedXact(xid)
err := n.handleFinished(nl, targets[target1ID], cos.MustMarshal(snap), nil)
Expect(err).To(BeNil())
msg := &core.NotifMsg{Data: cos.MustMarshal(snap), AbortedX: snap.AbortedX}
n._finished(nl, targets[target1ID], msg)
Expect(nl.Aborted()).To(BeTrue())
Expect(nl.Err()).NotTo(BeNil())
})
Expand All @@ -208,17 +207,19 @@ var _ = Describe("Notifications xaction test", func() {
statsProgress := baseXact(xid, updatedObjCount, updatedByteCount)

// Handle fist set of stats
err := n.handleProgress(nl, targets[target1ID], cos.MustMarshal(statsFirst), nil)
Expect(err).To(BeNil())
msg := &core.NotifMsg{Data: cos.MustMarshal(statsFirst)}
nl.Lock()
n._progress(nl, targets[target1ID], msg)
nl.Unlock()
val, _ := nl.NodeStats().Load(target1ID)
snap, ok := val.(*core.Snap)
Expect(ok).To(BeTrue())
Expect(snap.Stats.Objs).To(BeEquivalentTo(initObjCount))
Expect(snap.Stats.Bytes).To(BeEquivalentTo(initByteCount))

// Next a Finished notification with stats
err = n.handleFinished(nl, targets[target1ID], cos.MustMarshal(statsProgress), nil)
Expect(err).To(BeNil())
msg = &core.NotifMsg{Data: cos.MustMarshal(statsProgress)}
n._finished(nl, targets[target1ID], msg)
val, _ = nl.NodeStats().Load(target1ID)
snap, ok = val.(*core.Snap)
Expect(ok).To(BeTrue())
Expand Down
43 changes: 26 additions & 17 deletions ais/prxtxn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Package ais provides core functionality for the AIStore object storage.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package ais

Expand Down Expand Up @@ -618,9 +618,10 @@ func (p *proxy) tcb(bckFrom, bckTo *meta.Bck, msg *apc.ActMsg, dryRun bool) (xid
// 4. IC
nl := xact.NewXactNL(c.uuid, msg.Action, &c.smap.Smap, nil, bckFrom.Bucket(), bckTo.Bucket())
nl.SetOwner(equalIC)
// cleanup upon failure via notification listener callback
// (note synchronous cleanup below)
r := &_rmbck{p, bckTo, existsTo}

// add abort-triggered cleanup via notifications
// (also, note immediate cleanup below on failure to commit)
r := &_tcbfin{p, bckTo, existsTo}
nl.F = r.cb
p.ic.registerEqual(regIC{nl: nl, smap: c.smap, query: c.req.Query})

Expand Down Expand Up @@ -1148,27 +1149,35 @@ func (p *proxy) initBackendProp(nprops *cmn.Bprops) (err error) {
return
}

////////////
// _rmbck //
////////////
/////////////
// _tcbfin //
/////////////

type _rmbck struct {
type _tcbfin struct {
p *proxy
bck *meta.Bck
existed bool
}

// TODO: revisit other cleanup
func (r *_rmbck) cb(nl nl.Listener) {
err := nl.Err()
if err == nil {
// NOTE: _may_ remove newly created destination bucket
func (r *_tcbfin) cb(nl nl.Listener) {
var (
err = nl.Err()
aborted = nl.Aborted()
)
switch {
case err == nil:
debug.Assert(!aborted, nl.String())
return
}
if err != cmn.ErrXactUserAbort {
nlog.Errorln(err)
}
if r.existed {
case !aborted:
nlog.Infoln("Warning:", err)
return
default:
nlog.Warningln("abort:", err)
if r.existed {
return
}
}
// when (tcb aborted) and (did not exist prior)
_ = r.p.destroyBucket(&apc.ActMsg{Action: apc.ActDestroyBck}, r.bck)
}
9 changes: 4 additions & 5 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Package cli provides easy-to-use commands to manage, monitor, and utilize AIS clusters.
//
// This file contains common constants and global variables
// (including all command-line options aka flags).
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package cli

Expand All @@ -17,6 +14,8 @@ import (
"github.com/urfave/cli"
)

// Contains common constants and global variables (including all command-line options aka flags).

// top-level commands (categories - nouns)
const (
commandAuth = "auth"
Expand Down Expand Up @@ -590,7 +589,7 @@ var (
syncFlag = cli.BoolFlag{
Name: "sync",
Usage: "synchronize destination bucket with its remote (e.g., Cloud or remote AIS) source;\n" +
indent1 + "\tthe option is a stronger variant of the '--latest' (option) that in addition entails\n" +
indent1 + "\tthe option is a stronger variant of the '--latest' (option) - in addition it entails\n" +
indent1 + "\tremoving of the objects that no longer exist remotely\n" +
indent1 + "\t(see also: 'ais show bucket versioning' and the corresponding documentation)",
}
Expand Down
33 changes: 21 additions & 12 deletions core/notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package core

import (
"fmt"
"strings"
"time"
)

Expand Down Expand Up @@ -34,30 +34,39 @@ type (

// intra-cluster notification interface
Notif interface {
OnFinishedCB() func(Notif, error)
OnFinishedCB() func(Notif, error, bool /*aborted*/)
OnProgressCB() func(Notif)
NotifyInterval() time.Duration // notify interval in secs
LastNotifTime() int64 // time last notified
SetLastNotified(now int64)
Upon(u Upon) bool
Subscribers() []string
ToNotifMsg() NotifMsg
ToNotifMsg(aborted bool) NotifMsg
}
// intra-cluster notification base

// intra-cluster notification message
NotifMsg struct {
UUID string `json:"uuid"` // xaction UUID
NodeID string `json:"node_id"` // notifier node ID
Kind string `json:"kind"` // xaction `Kind`
ErrMsg string `json:"err"` // error.Error()
Data []byte `json:"message"` // typed message
UUID string `json:"uuid"` // xaction UUID
NodeID string `json:"node_id"` // notifier node ID
Kind string `json:"kind"` // xaction `Kind`
ErrMsg string `json:"err"` // error.Error()
Data []byte `json:"message"` // (e.g. usage: custom progress stats)
AbortedX bool `json:"aborted"` // true if aborted (see related: Snap.AbortedX)
}
)

func (msg *NotifMsg) String() (s string) {
s = fmt.Sprintf("nmsg-%s[%s]<=%s", msg.Kind, msg.UUID, msg.NodeID)
var sb strings.Builder
sb.WriteString("nmsg-")
sb.WriteString(msg.Kind)
sb.WriteByte('[')
sb.WriteString(msg.UUID)
sb.WriteByte(']')
sb.WriteString("<=")
sb.WriteString(msg.NodeID)
if msg.ErrMsg != "" {
s += ", err: " + msg.ErrMsg
sb.WriteString(", err: ")
sb.WriteString(msg.ErrMsg)
}
return
return sb.String()
}
Loading

0 comments on commit db1b43e

Please sign in to comment.