Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WaitStage for Grafana Alertmanager #278

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions notify/grafana_alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (am *GrafanaAlertmanager) ApplyConfig(cfg Configuration) (err error) {
var receivers []*nfstatus.Receiver
activeReceivers := GetActiveReceiversMap(am.route)
for name := range integrationsMap {
stage := am.createReceiverStage(name, nfstatus.GetIntegrations(integrationsMap[name]), am.waitFunc, am.notificationLog, am.pipelineAndStateTimestampsMismatchAction)
stage := am.createReceiverStage(name, nfstatus.GetIntegrations(integrationsMap[name]), am.notificationLog, am.pipelineAndStateTimestampsMismatchAction)
routingStage[name] = notify.MultiStage{meshStage, silencingStage, timeMuteStage, inhibitionStage, stage}
_, isActive := activeReceivers[name]

Expand Down Expand Up @@ -884,7 +884,7 @@ func (e AlertValidationError) Error() string {
}

// createReceiverStage creates a pipeline of stages for a receiver.
func (am *GrafanaAlertmanager) createReceiverStage(name string, integrations []*notify.Integration, wait func() time.Duration, notificationLog notify.NotificationLog, act stages.Action) notify.Stage {
func (am *GrafanaAlertmanager) createReceiverStage(name string, integrations []*notify.Integration, notificationLog notify.NotificationLog, act stages.Action) notify.Stage {
var fs notify.FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
Expand All @@ -893,7 +893,7 @@ func (am *GrafanaAlertmanager) createReceiverStage(name string, integrations []*
Idx: uint32(integrations[i].Index()),
}
var s notify.MultiStage
s = append(s, notify.NewWaitStage(wait))
s = append(s, stages.NewWaitStage(am.peer, am.peerTimeout))
s = append(s, notify.NewDedupStage(integrations[i], notificationLog, recv))
stage := stages.NewPipelineAndStateTimestampCoordinationStage(notificationLog, recv, act)
if stage != nil {
Expand Down
62 changes: 62 additions & 0 deletions notify/stages/wait_stage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package stages

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/types"
)

type PeerInfo interface {
Position() int
}
titolins marked this conversation as resolved.
Show resolved Hide resolved

// WaitStage waits for a certain amount of time before continuing or until the
// context is done.
type WaitStage struct {
peer PeerInfo
timeout time.Duration
}

// NewWaitStage returns a new WaitStage.
func NewWaitStage(p PeerInfo, peerTimeout time.Duration) *WaitStage {
return &WaitStage{
peer: p,
timeout: peerTimeout,
}
}

// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if ws.peer == nil {
return ctx, alerts, nil
}
peerPosition := ws.peer.Position()
wait := time.Duration(peerPosition) * ws.timeout
if wait == 0 {
return ctx, alerts, nil
}

t := time.NewTimer(wait)
defer t.Stop()
Comment on lines +43 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know about this, good catch. It seems like this was fixed in 1.23 though. As per the time.After docs:

Before Go 1.23, this documentation warned that the underlying Timer would not be recovered by the garbage collector until the timer fired, and that if efficiency was a concern, code should use NewTimer instead and call Timer.Stop if the timer is no longer needed. As of Go 1.23, the garbage collector can recover unreferenced, unstopped timers. There is no reason to prefer NewTimer when After will do.

Considering Grafana is already in 1.23, we could consider updating alerting and get this for free. We could keep this for the logs in any case - wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch :) can change it back then


select {
case <-t.C:
case <-ctx.Done():
return ctx, nil, ctx.Err()
}

gkey, _ := notify.GroupKey(ctx)
timeNow, _ := notify.Now(ctx)
level.Debug(l).Log(
"msg", "continue pipeline after waiting",
"aggrGroup", gkey,
"timeout", wait,
"peer_position", peerPosition,
"pipeline_time", timeNow,
)
return ctx, alerts, nil
}
83 changes: 83 additions & 0 deletions notify/stages/wait_stage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package stages

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/alertmanager/types"
"github.com/stretchr/testify/assert"
)

type mockPeer struct {
position int
}

func (m *mockPeer) Position() int {
return m.position
}

func TestWaitStageExec(t *testing.T) {
alerts := []*types.Alert{{}, {}, {}}
tests := []struct {
name string
peer PeerInfo
timeout time.Duration
contextTimeout time.Duration
expectedErr error
}{
{
name: "should not wait if no peer",
peer: nil,
timeout: 10 * time.Second,
expectedErr: nil,
},
{
name: "should not wait if with zero position",
peer: &mockPeer{position: 0},
timeout: 10 * time.Second,
expectedErr: nil,
},
{
name: "should wait for peer*timeout if peer with non-zero position",
peer: &mockPeer{position: 1},
timeout: 100 * time.Millisecond,
expectedErr: nil,
},
{
name: "Context timeout",
peer: &mockPeer{position: 2},
timeout: time.Second,
contextTimeout: 100 * time.Millisecond,
expectedErr: context.DeadlineExceeded,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
if tc.contextTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tc.contextTimeout)
defer cancel()
}

logger := log.NewNopLogger()
ws := &WaitStage{
peer: tc.peer,
timeout: tc.timeout,
}

gotCtx, gotAlerts, gotErr := ws.Exec(ctx, logger, alerts...)

assert.Equal(t, ctx, gotCtx)
if tc.expectedErr != nil {
assert.ErrorIs(t, gotErr, tc.expectedErr)
} else {
assert.NoError(t, gotErr)
assert.Equal(t, alerts, gotAlerts)
}
})
}
}