Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support activity retry policies #83

Merged
merged 9 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package task

import (
"context"
"fmt"
"math"
"time"

"github.com/microsoft/durabletask-go/internal/protos"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -10,7 +13,19 @@ import (
type callActivityOption func(*callActivityOptions) error

type callActivityOptions struct {
rawInput *wrapperspb.StringValue
rawInput *wrapperspb.StringValue
retryPolicy *ActivityRetryPolicy
}

type ActivityRetryPolicy struct {
// Max number of attempts to try the activity call, first execution inclusive
MaxAttempts int
//
cgillum marked this conversation as resolved.
Show resolved Hide resolved
InitialRetryInterval time.Duration
BackoffCoefficient float64
MaxRetryInterval time.Duration
RetryTimeout time.Duration
Handle func(error) bool
}

// WithActivityInput configures an input for an activity invocation.
Expand All @@ -34,6 +49,37 @@ func WithRawActivityInput(input string) callActivityOption {
}
}

func WithRetryPolicy(policy *ActivityRetryPolicy) callActivityOption {
return func(opt *callActivityOptions) error {
if policy == nil {
return nil
}
if policy.InitialRetryInterval <= 0 {
return fmt.Errorf("InitialRetryInterval must be greater than 0")
}
if policy.MaxAttempts <= 0 {
// setting 1 max attempt is equivalent to not retrying
policy.MaxAttempts = 1
}
if policy.BackoffCoefficient <= 0 {
policy.BackoffCoefficient = 1
}
if policy.MaxRetryInterval <= 0 {
policy.MaxRetryInterval = math.MaxInt64
}
if policy.RetryTimeout <= 0 {
policy.RetryTimeout = math.MaxInt64
}
if policy.Handle == nil {
policy.Handle = func(err error) bool {
return true
}
}
opt.retryPolicy = policy
return nil
}
}

// ActivityContext is the context parameter type for activity implementations.
type ActivityContext interface {
GetInput(resultPtr any) error
Expand Down
61 changes: 61 additions & 0 deletions task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/list"
"encoding/json"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -236,6 +237,16 @@ func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...call
}
}

if options.retryPolicy != nil {
return ctx.internalCallActivityWithRetries(ctx.CurrentTimeUtc, func() Task {
return ctx.internalScheduleActivity(activity, options)
}, *options.retryPolicy, 0)
}

return ctx.internalScheduleActivity(activity, options)
}

func (ctx *OrchestrationContext) internalScheduleActivity(activity interface{}, options *callActivityOptions) Task {
scheduleTaskAction := helpers.NewScheduleTaskAction(
ctx.getNextSequenceNumber(),
helpers.GetTaskFunctionName(activity),
Expand All @@ -248,6 +259,56 @@ func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...call
return task
}

func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt time.Time, schedule func() Task, policy ActivityRetryPolicy, retryCount int) Task {
return &taskWrapper{
delegate: schedule(),
onAwaitResult: func(v any, err error) error {
if err == nil {
return nil
}

if retryCount+1 >= policy.MaxAttempts {
// next try will exceed the max attempts, dont continue
return err
}

nextDelay := computeNextDelay(ctx.CurrentTimeUtc, policy, retryCount, initialAttempt, err)
if nextDelay == 0 {
return err
}

timerErr := ctx.createTimerInternal(nextDelay).Await(nil)
if timerErr != nil {
// TODO use errors.Join when updating golang
return fmt.Errorf("%v %w", timerErr, err)
}

err = ctx.internalCallActivityWithRetries(initialAttempt, schedule, policy, retryCount+1).Await(v)
if err == nil {
return nil
}
return err
},
}
}

func computeNextDelay(currentTimeUtc time.Time, policy ActivityRetryPolicy, attempt int, firstAttempt time.Time, err error) time.Duration {
if policy.Handle(err) {
isExpired := false
if policy.RetryTimeout != math.MaxInt64 {
isExpired = currentTimeUtc.After(firstAttempt.Add(policy.RetryTimeout))
}
if !isExpired {
nextDelayMs := float64(policy.InitialRetryInterval.Milliseconds()) * math.Pow(policy.BackoffCoefficient, float64(attempt))
if nextDelayMs < float64(policy.MaxRetryInterval.Milliseconds()) {
return time.Duration(int64(nextDelayMs) * int64(time.Millisecond))
}
return policy.MaxRetryInterval
}
}
return 0
}

func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task {
options := new(callSubOrchestratorOptions)
for _, configure := range opts {
Expand Down
133 changes: 133 additions & 0 deletions task/orchestrator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package task

import (
"testing"
"time"
)

func Test_computeNextDelay(t *testing.T) {
time1 := time.Now()
time2 := time.Now().Add(1 * time.Minute)
type args struct {
currentTimeUtc time.Time
policy ActivityRetryPolicy
attempt int
firstAttempt time.Time
err error
}
tests := []struct {
name string
args args
want time.Duration
}{
{
name: "first attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
MaxRetryInterval: 10 * time.Second,
Handle: func(err error) bool { return true },
RetryTimeout: 2 * time.Minute,
},
attempt: 0,
firstAttempt: time1,
},
want: 2 * time.Second,
},
{
name: "second attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
MaxRetryInterval: 10 * time.Second,
Handle: func(err error) bool { return true },
RetryTimeout: 2 * time.Minute,
},
attempt: 1,
firstAttempt: time1,
},
want: 4 * time.Second,
},
{
name: "third attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
MaxRetryInterval: 10 * time.Second,
Handle: func(err error) bool { return true },
RetryTimeout: 2 * time.Minute,
},
attempt: 2,
firstAttempt: time1,
},
want: 8 * time.Second,
},
{
name: "fourth attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
MaxRetryInterval: 10 * time.Second,
Handle: func(err error) bool { return true },
RetryTimeout: 2 * time.Minute,
},
attempt: 3,
firstAttempt: time1,
},
want: 10 * time.Second,
},
{
name: "expired",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
MaxRetryInterval: 10 * time.Second,
Handle: func(err error) bool { return true },
RetryTimeout: 30 * time.Second,
},
attempt: 3,
firstAttempt: time1,
},
want: 0,
},
{
name: "fourth attempt backoff 1",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 1,
MaxRetryInterval: 10 * time.Second,
Handle: func(err error) bool { return true },
RetryTimeout: 2 * time.Minute,
},
attempt: 3,
firstAttempt: time1,
},
want: 2 * time.Second,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := computeNextDelay(tt.args.currentTimeUtc, tt.args.policy, tt.args.attempt, tt.args.firstAttempt, tt.args.err); got != tt.want {
t.Errorf("computeNextDelay() = %v, want %v", got, tt.want)
}
})
}
}
11 changes: 11 additions & 0 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,14 @@ func (t *completableTask) completeInternal() {
t.completedCallback()
}
}

type taskWrapper struct {
delegate Task
onAwaitResult func(any, error) error
}

var _ Task = &taskWrapper{}

func (t *taskWrapper) Await(v any) error {
return t.onAwaitResult(v, t.delegate.Await(v))
}
Loading