Skip to content

Commit

Permalink
review: add "scheduler" and "nopScheduler" types
Browse files Browse the repository at this point in the history
Instead of using the `dryRun` flag to control whether or not operations
are added to the (standard) scheduler, we now select the type of the
scheduler pass to the timelock worker service:

* if dryRun is false, use the standard scheduler
* if dryRun is true, use the new "nop" scheduler, which only logs the
  calls but does not do anything

In practice the "standard scheduler" is a new type + interface as well,
since the existing implementation defined a the schedule as a simple
data type which was associated with the timelock worker via implicit
composition (though all the schedule related methods were defined on the
timelock worker type).
  • Loading branch information
gustavogama-cll committed Nov 19, 2024
1 parent 26394ab commit 03bc8d7
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 71 deletions.
89 changes: 68 additions & 21 deletions pkg/timelock/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,39 @@ import (

type operationKey [32]byte

type Scheduler interface {
runScheduler(ctx context.Context) <-chan struct{}
addToScheduler(op *contract.TimelockCallScheduled)
delFromScheduler(op operationKey)
dumpOperationStore(now func() time.Time)
}

type executeFn func(context.Context, []*contract.TimelockCallScheduled)

// Scheduler represents a scheduler with an in memory store.
// Whenever accesing the map the mutex should be Locked, to prevent
// any race condition.
type scheduler struct {
mu sync.Mutex
ticker *time.Ticker
add chan *contract.TimelockCallScheduled
del chan operationKey
store map[operationKey][]*contract.TimelockCallScheduled
busy bool
mu sync.Mutex
ticker *time.Ticker
add chan *contract.TimelockCallScheduled
del chan operationKey
store map[operationKey][]*contract.TimelockCallScheduled
busy bool
logger *zerolog.Logger
executeFn executeFn
}

// newScheduler returns a new initialized scheduler.
func newScheduler(tick time.Duration) *scheduler {
func newScheduler(tick time.Duration, logger *zerolog.Logger, executeFn executeFn) *scheduler {
s := &scheduler{
ticker: time.NewTicker(tick),
add: make(chan *contract.TimelockCallScheduled),
del: make(chan operationKey),
store: make(map[operationKey][]*contract.TimelockCallScheduled),
busy: false,
ticker: time.NewTicker(tick),
add: make(chan *contract.TimelockCallScheduled),
del: make(chan operationKey),
store: make(map[operationKey][]*contract.TimelockCallScheduled),
busy: false,
logger: logger,
executeFn: executeFn,
}

return s
Expand All @@ -48,7 +61,7 @@ func newScheduler(tick time.Duration) *scheduler {
// call them this way so no process is allowd to add/delete from
// the store, which could cause race conditions like adding/deleting
// while the operation is being executed.
func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
func (tw *scheduler) runScheduler(ctx context.Context) <-chan struct{} {
done := make(chan struct{})

go func() {
Expand All @@ -65,7 +78,7 @@ func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
tw.logger.Debug().Msgf("new scheduler tick: operations in store")
tw.setSchedulerBusy()
for _, op := range tw.store {
tw.execute(ctx, op)
tw.executeFn(ctx, op)
}
tw.setSchedulerFree()
} else {
Expand Down Expand Up @@ -100,7 +113,7 @@ func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
}

// updateSchedulerDelay updates the internal ticker delay, so it can be reconfigured while running.
func (tw *Worker) updateSchedulerDelay(t time.Duration) {
func (tw *scheduler) updateSchedulerDelay(t time.Duration) {
if t <= 0 {
tw.logger.Debug().Msgf("internal min delay not changed, invalid duration: %v", t.String())
return
Expand All @@ -111,7 +124,7 @@ func (tw *Worker) updateSchedulerDelay(t time.Duration) {
}

// addToScheduler adds a new CallSchedule operation safely to the store.
func (tw *Worker) addToScheduler(op *contract.TimelockCallScheduled) {
func (tw *scheduler) addToScheduler(op *contract.TimelockCallScheduled) {
tw.mu.Lock()
defer tw.mu.Unlock()
tw.logger.Debug().Msgf("scheduling operation: %x", op.Id)
Expand All @@ -120,35 +133,35 @@ func (tw *Worker) addToScheduler(op *contract.TimelockCallScheduled) {
}

// delFromScheduler deletes an operation safely from the store.
func (tw *Worker) delFromScheduler(op operationKey) {
func (tw *scheduler) delFromScheduler(op operationKey) {
tw.mu.Lock()
defer tw.mu.Unlock()
tw.logger.Debug().Msgf("de-scheduling operation: %v", op)
tw.del <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
}

func (tw *Worker) setSchedulerBusy() {
func (tw *scheduler) setSchedulerBusy() {
tw.logger.Debug().Msgf("setting scheduler busy")
tw.mu.Lock()
tw.busy = true
tw.mu.Unlock()
}

func (tw *Worker) setSchedulerFree() {
func (tw *scheduler) setSchedulerFree() {
tw.logger.Debug().Msgf("setting scheduler free")
tw.mu.Lock()
tw.busy = false
tw.mu.Unlock()
}

func (tw *Worker) isSchedulerBusy() bool {
func (tw *scheduler) isSchedulerBusy() bool {
return tw.busy
}

// dumpOperationStore dumps to the logger and to the log file the current scheduled unexecuted operations.
// maps in go don't guarantee order, so that's why we have to find the earliest block.
func (tw *Worker) dumpOperationStore(now func() time.Time) {
func (tw *scheduler) dumpOperationStore(now func() time.Time) {
if len(tw.store) <= 0 {
tw.logger.Info().Msgf("no operations to dump")
return
Expand Down Expand Up @@ -250,3 +263,37 @@ func toEarliestRecord(op *contract.TimelockCallScheduled) string {
func toSubsequentRecord(op *contract.TimelockCallScheduled) string {
return fmt.Sprintf("CallSchedule pending ID: %x\tBlock Number: %v\n", op.Id, op.Raw.BlockNumber)
}

// ----- nop scheduler -----
// nopScheduler implements the Scheduler interface but doesn't not effectively trigger any operations.
type nopScheduler struct {
logger *zerolog.Logger
}

func newNopScheduler(logger *zerolog.Logger) *nopScheduler {
return &nopScheduler{logger: logger}
}

func (s *nopScheduler) runScheduler(ctx context.Context) <-chan struct{} {
s.logger.Info().Msg("nop.runScheduler")
ch := make(chan struct{})

go func() {
<-ctx.Done()
close(ch)
}()

return ch
}

func (s *nopScheduler) addToScheduler(op *contract.TimelockCallScheduled) {
s.logger.Info().Any("op", op).Msg("nop.addToScheduler")
}

func (s *nopScheduler) delFromScheduler(key operationKey) {
s.logger.Info().Any("key", key).Msg("nop.delFromScheduler")
}

func (s *nopScheduler) dumpOperationStore(now func() time.Time) {
s.logger.Info().Msg("nop.dumpOperationStore")
}
71 changes: 38 additions & 33 deletions pkg/timelock/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package timelock

import (
"context"
"fmt"
"os"
"reflect"
Expand All @@ -16,7 +17,9 @@ import (
)

func Test_newScheduler(t *testing.T) {
tScheduler := newScheduler(10 * time.Second)
logger := zerolog.Nop()
execFn := func(context.Context, []*contract.TimelockCallScheduled) {}
tScheduler := newTestScheduler()

type args struct {
tick time.Duration
Expand All @@ -36,63 +39,60 @@ func Test_newScheduler(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := newScheduler(tt.args.tick)
got := newScheduler(tt.args.tick, &logger, execFn)
if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("newScheduler() = %v, want %v", got, tt.want)
}
})
}
}

func TestWorker_updateSchedulerDelay(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)
func Test_scheduler_updateSchedulerDelay(t *testing.T) {
tScheduler := newTestScheduler()

// Should never fail
testWorker.updateSchedulerDelay(1 * time.Second)
testWorker.updateSchedulerDelay(-1 * time.Second)
testWorker.updateSchedulerDelay(0 * time.Second)
tScheduler.updateSchedulerDelay(1 * time.Second)
tScheduler.updateSchedulerDelay(-1 * time.Second)
tScheduler.updateSchedulerDelay(0 * time.Second)
}

func TestWorker_isSchedulerBusy(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)
func Test_scheduler_isSchedulerBusy(t *testing.T) {
tScheduler := newTestScheduler()

isBusy := testWorker.isSchedulerBusy()
isBusy := tScheduler.isSchedulerBusy()
assert.Equal(t, false, isBusy, "scheduler should be busy by default")

testWorker.setSchedulerBusy()
isBusy = testWorker.isSchedulerBusy()
tScheduler.setSchedulerBusy()
isBusy = tScheduler.isSchedulerBusy()
assert.Equal(t, true, isBusy, "scheduler should be busy after setSchedulerBusy()")

testWorker.setSchedulerFree()
isBusy = testWorker.isSchedulerBusy()
tScheduler.setSchedulerFree()
isBusy = tScheduler.isSchedulerBusy()
assert.Equal(t, false, isBusy, "scheduler shouldn't be busy after setSchedulerFree()")
}

func TestWorker_setSchedulerBusy(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)
func Test_scheduler_setSchedulerBusy(t *testing.T) {
tScheduler := newTestScheduler()

testWorker.setSchedulerBusy()
isBusy := testWorker.isSchedulerBusy()
tScheduler.setSchedulerBusy()
isBusy := tScheduler.isSchedulerBusy()
assert.Equal(t, true, isBusy, "scheduler should be busy after setSchedulerBusy()")
}

func TestWorker_setSchedulerFree(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)
func Test_scheduler_setSchedulerFree(t *testing.T) {
logger := zerolog.Nop()
execFn := func(context.Context, []*contract.TimelockCallScheduled) {}
tScheduler := newScheduler(10 * time.Second, &logger, execFn)

testWorker.setSchedulerFree()
isBusy := testWorker.isSchedulerBusy()
tScheduler.setSchedulerFree()
isBusy := tScheduler.isSchedulerBusy()
assert.Equal(t, false, isBusy, "scheduler shouldn't be busy after setSchedulerFree()")
}

// Test_dumpOperationStore tests the dumpOperationStore method and ensures that it writes the correct contents to the file.
func Test_dumpOperationStore(t *testing.T) {
var (
fName = logPath + logFile
logger = zerolog.Nop()
earliestBlock = 42
opKeys = generateOpKeys(t, []string{"1", "2"})

Expand All @@ -117,11 +117,8 @@ func Test_dumpOperationStore(t *testing.T) {
opKeys[1]: {following},
}

worker = &Worker{
logger: &logger,
scheduler: scheduler{
store: store,
},
scheduler = scheduler{
store: store,
}
)

Expand All @@ -139,7 +136,7 @@ func Test_dumpOperationStore(t *testing.T) {
wantPrefix := fmt.Sprintf("Process stopped at %v\n", nowFunc().In(time.UTC))

// Write the store to the file.
worker.dumpOperationStore(nowFunc)
scheduler.dumpOperationStore(nowFunc)

// Read the file and compare the contents.
gotRead, err := os.ReadFile(fName)
Expand All @@ -153,6 +150,14 @@ func Test_dumpOperationStore(t *testing.T) {
assert.Equal(t, wantRead, gotRead)
}

// ----- helpers -----

func newTestScheduler() *scheduler {
logger := zerolog.Nop()
execFn := func(context.Context, []*contract.TimelockCallScheduled) {}
return newScheduler(10 * time.Second, &logger, execFn)
}

// generateOpKeys generates a slice of operation keys from a slice of strings.
func generateOpKeys(t *testing.T, in []string) [][32]byte {
t.Helper()
Expand Down
25 changes: 12 additions & 13 deletions pkg/timelock/timelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Worker struct {
dryRun bool
logger *zerolog.Logger
privateKey *ecdsa.PrivateKey
scheduler
scheduler Scheduler
}

var httpSchemes = []string{"http", "https"}
Expand Down Expand Up @@ -130,7 +130,12 @@ func NewTimelockWorker(
dryRun: dryRun,
logger: logger,
privateKey: privateKeyECDSA,
scheduler: *newScheduler(time.Duration(pollPeriod) * time.Second),
}

if dryRun {
tWorker.scheduler = newNopScheduler(logger)
} else {
tWorker.scheduler = newScheduler(time.Duration(pollPeriod)*time.Second, logger, tWorker.execute)
}

return tWorker, nil
Expand All @@ -145,7 +150,7 @@ func (tw *Worker) Listen(ctx context.Context) error {
tw.startLog()

// Run the scheduler to add/del operations in a thread-safe way.
schedulingDone := tw.runScheduler(ctxwc)
schedulingDone := tw.scheduler.runScheduler(ctxwc)

// Retrieve historical logs.
historyDone, historyCh, err := tw.retrieveHistoricalLogs(ctxwc)
Expand Down Expand Up @@ -176,7 +181,7 @@ func (tw *Worker) Listen(ctx context.Context) error {

tw.logger.Info().Msg("shutting down timelock-worker")
tw.logger.Info().Msg("dumping operation store")
tw.dumpOperationStore(time.Now)
tw.scheduler.dumpOperationStore(time.Now)

// Wait for all goroutines to finish.
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down Expand Up @@ -451,9 +456,7 @@ func (tw *Worker) handleLog(ctx context.Context, log types.Log) error {

if !isDone(ctx, tw.contract, cs.Id) && isOperation(ctx, tw.contract, cs.Id) {
tw.logger.Info().Hex(fieldTXHash, cs.Raw.TxHash[:]).Uint64(fieldBlockNumber, cs.Raw.BlockNumber).Msgf("%s received", eventCallScheduled)
if !tw.dryRun {
tw.addToScheduler(cs)
}
tw.scheduler.addToScheduler(cs)
}

// A CallExecuted which is in Done status should delete the task in the scheduler store.
Expand All @@ -465,9 +468,7 @@ func (tw *Worker) handleLog(ctx context.Context, log types.Log) error {

if isDone(ctx, tw.contract, cs.Id) {
tw.logger.Info().Hex(fieldTXHash, cs.Raw.TxHash[:]).Uint64(fieldBlockNumber, cs.Raw.BlockNumber).Msgf("%s received, skipping operation", eventCallExecuted)
if !tw.dryRun {
tw.delFromScheduler(cs.Id)
}
tw.scheduler.delFromScheduler(cs.Id)
}

// A Cancelled which is in Done status should delete the task in the scheduler store.
Expand All @@ -479,9 +480,7 @@ func (tw *Worker) handleLog(ctx context.Context, log types.Log) error {

if isDone(ctx, tw.contract, cs.Id) {
tw.logger.Info().Hex(fieldTXHash, cs.Raw.TxHash[:]).Uint64(fieldBlockNumber, cs.Raw.BlockNumber).Msgf("%s received, cancelling operation", eventCancelled)
if !tw.dryRun {
tw.delFromScheduler(cs.Id)
}
tw.scheduler.delFromScheduler(cs.Id)
}
default:
tw.logger.Info().Str("event", event.Name).Msgf("discarding event")
Expand Down
Loading

0 comments on commit 03bc8d7

Please sign in to comment.