Skip to content

Commit

Permalink
implement new wait-stage
Browse files Browse the repository at this point in the history
  • Loading branch information
yuri-tceretian committed Feb 7, 2025
1 parent 995709f commit 20a5381
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 3 deletions.
6 changes: 3 additions & 3 deletions notify/grafana_alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (am *GrafanaAlertmanager) ApplyConfig(cfg Configuration) (err error) {
var receivers []*nfstatus.Receiver
activeReceivers := GetActiveReceiversMap(am.route)
for name := range integrationsMap {
stage := am.createReceiverStage(name, nfstatus.GetIntegrations(integrationsMap[name]), am.waitFunc, am.notificationLog, am.pipelineAndStateTimestampsMismatchAction)
stage := am.createReceiverStage(name, nfstatus.GetIntegrations(integrationsMap[name]), am.notificationLog, am.pipelineAndStateTimestampsMismatchAction)
routingStage[name] = notify.MultiStage{meshStage, silencingStage, timeMuteStage, inhibitionStage, stage}
_, isActive := activeReceivers[name]

Expand Down Expand Up @@ -884,7 +884,7 @@ func (e AlertValidationError) Error() string {
}

// createReceiverStage creates a pipeline of stages for a receiver.
func (am *GrafanaAlertmanager) createReceiverStage(name string, integrations []*notify.Integration, wait func() time.Duration, notificationLog notify.NotificationLog, act stages.Action) notify.Stage {
func (am *GrafanaAlertmanager) createReceiverStage(name string, integrations []*notify.Integration, notificationLog notify.NotificationLog, act stages.Action) notify.Stage {
var fs notify.FanoutStage
for i := range integrations {
recv := &nflogpb.Receiver{
Expand All @@ -893,7 +893,7 @@ func (am *GrafanaAlertmanager) createReceiverStage(name string, integrations []*
Idx: uint32(integrations[i].Index()),
}
var s notify.MultiStage
s = append(s, notify.NewWaitStage(wait))
s = append(s, stages.NewWaitStage(am.peer, am.peerTimeout))
s = append(s, notify.NewDedupStage(integrations[i], notificationLog, recv))
stage := stages.NewPipelineAndStateTimestampCoordinationStage(notificationLog, recv, act)
if stage != nil {
Expand Down
62 changes: 62 additions & 0 deletions notify/stages/wait_stage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package stages

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/alertmanager/notify"
"github.com/prometheus/alertmanager/types"
)

type PeerInfo interface {
Position() int
}

// WaitStage waits for a certain amount of time before continuing or until the
// context is done.
type WaitStage struct {
peer PeerInfo
timeout time.Duration
}

// NewWaitStage returns a new WaitStage.
func NewWaitStage(p PeerInfo, peerTimeout time.Duration) *WaitStage {
return &WaitStage{
peer: p,
timeout: peerTimeout,
}
}

// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
if ws.peer == nil {
return ctx, alerts, nil
}
peerPosition := ws.peer.Position()
wait := time.Duration(peerPosition) * ws.timeout
if wait == 0 {
return ctx, alerts, nil
}

t := time.NewTimer(wait)
defer t.Stop()

select {
case <-t.C:
case <-ctx.Done():
return ctx, nil, ctx.Err()
}

gkey, _ := notify.GroupKey(ctx)
timeNow, _ := notify.Now(ctx)
level.Debug(l).Log(
"msg", "continue pipeline after waiting",
"aggrGroup", gkey,
"timeout", wait,
"peer_position", peerPosition,
"pipeline_time", timeNow,
)
return ctx, alerts, nil
}
83 changes: 83 additions & 0 deletions notify/stages/wait_stage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package stages

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/alertmanager/types"
"github.com/stretchr/testify/assert"
)

type mockPeer struct {
position int
}

func (m *mockPeer) Position() int {
return m.position
}

func TestWaitStageExec(t *testing.T) {
alerts := []*types.Alert{{}, {}, {}}
tests := []struct {
name string
peer PeerInfo
timeout time.Duration
contextTimeout time.Duration
expectedErr error
}{
{
name: "should not wait if no peer",
peer: nil,
timeout: 10 * time.Second,
expectedErr: nil,
},
{
name: "should not wait if with zero position",
peer: &mockPeer{position: 0},
timeout: 10 * time.Second,
expectedErr: nil,
},
{
name: "should wait for peer*timeout if peer with non-zero position",
peer: &mockPeer{position: 1},
timeout: 100 * time.Millisecond,
expectedErr: nil,
},
{
name: "Context timeout",
peer: &mockPeer{position: 2},
timeout: time.Second,
contextTimeout: 100 * time.Millisecond,
expectedErr: context.DeadlineExceeded,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
if tc.contextTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tc.contextTimeout)
defer cancel()
}

logger := log.NewNopLogger()
ws := &WaitStage{
peer: tc.peer,
timeout: tc.timeout,
}

gotCtx, gotAlerts, gotErr := ws.Exec(ctx, logger, alerts...)

assert.Equal(t, ctx, gotCtx)
if tc.expectedErr != nil {
assert.ErrorIs(t, gotErr, tc.expectedErr)
} else {
assert.NoError(t, gotErr)
assert.Equal(t, alerts, gotAlerts)
}
})
}
}

0 comments on commit 20a5381

Please sign in to comment.