Skip to content

Commit

Permalink
feat: add "pagination" to http-based event log poller
Browse files Browse the repository at this point in the history
Modify the HTTP log poller to fetch blocks in pages or chunks.

Instead of trying to fetch all the logs from the RPC in one go (which
fails if there are too many blocks between the specified "initial block"
until the last one), we now fetch logs in sequential "pages" of N blocks. For
instance, say we're using a page size of 1000:

```
# before
GetLogs(FromBlock: 0, ToBlock: "<CurrentBLock>")

# after
GetLogs(FromBlock:     0, ToBlock: 1000)
GetLogs(FromBlock:  1000, ToBlock: 2000)
GetLogs(FromBlock:  2000, ToBlock: 3000)
...
GetLogs(FromBlock: 99000, ToBlock: "<CurrentBlock>")
```

The page size is controlled by a new flag "event-listener-poll-size" (or
the corresponding environment variable "EVENT_LISTENER_POLL_SIZE").
The recommended value based on tests with Monad is 1000.
  • Loading branch information
gustavogama-cll committed Feb 1, 2025
1 parent 3676476 commit f3a54ee
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 25 deletions.
9 changes: 8 additions & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func startCommand() *cobra.Command {

nodeURL, privateKey, timelockAddress, callProxyAddress string
fromBlock, pollPeriod, eventListenerPollPeriod int64
eventListenerPollSize uint64
dryRun bool
)

Expand All @@ -42,6 +43,7 @@ func startCommand() *cobra.Command {
startCmd.Flags().Int64Var(&fromBlock, "from-block", timelockConf.FromBlock, "Start watching from this block")
startCmd.Flags().Int64Var(&pollPeriod, "poll-period", timelockConf.PollPeriod, "Poll period in seconds")
startCmd.Flags().Int64Var(&eventListenerPollPeriod, "event-listener-poll-period", timelockConf.EventListenerPollPeriod, "Event Listener poll period in seconds")
startCmd.Flags().Uint64Var(&eventListenerPollSize, "event-listener-poll-size", timelockConf.EventListenerPollSize, "Number of entries to fetch when polling logs")
startCmd.Flags().BoolVar(&dryRun, "dry-run", timelockConf.DryRun, "Enable \"dry run\" mode -- monitor events but don't trigger any calls")

return &startCmd
Expand Down Expand Up @@ -91,13 +93,18 @@ func startTimelock(cmd *cobra.Command) {
slog.Fatalf("value of poll-period not set: %s", err.Error())
}

eventListenerPollSize, err := cmd.Flags().GetUint64("event-listener-poll-size")
if err != nil {
slog.Fatalf("value of event-listener-poll-size not set: %s", err.Error())
}

dryRun, err := cmd.Flags().GetBool("dry-run")
if err != nil {
slog.Fatalf("value of dry-run not set: %s", err.Error())
}

tWorker, err := timelock.NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey,
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, dryRun, slog)
big.NewInt(fromBlock), pollPeriod, eventListenerPollPeriod, eventListenerPollSize, dryRun, slog)
if err != nil {
slog.Fatalf("error creating the timelock-worker: %s", err.Error())
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
FromBlock int64 `mapstructure:"FROM_BLOCK"`
PollPeriod int64 `mapstructure:"POLL_PERIOD"`
EventListenerPollPeriod int64 `mapstructure:"EVENT_LISTENER_POLL_PERIOD"`
EventListenerPollSize uint64 `mapstructure:"EVENT_LISTENER_POLL_SIZE"`
DryRun bool `mapstructure:"DRY_RUN"`
}

Expand Down Expand Up @@ -83,6 +84,15 @@ func NewTimelockCLI() (*Config, error) {
c.EventListenerPollPeriod = int64(pp)
}

if os.Getenv("EVENT_LISTENER_POLL_SIZE") != "" {
pp, err := strconv.Atoi(os.Getenv("EVENT_LISTENER_POLL_SIZE"))
if err != nil {
return nil, fmt.Errorf("unable to parse EVENT_LISTENER_POLL_SIZE value: %w", err)
}

c.EventListenerPollSize = uint64(pp) //nolint:gosec
}

if os.Getenv("DRY_RUN") != "" {
trueValues := []string{"true", "yes", "on", "enabled", "1"}
c.DryRun = slices.Contains(trueValues, strings.ToLower(os.Getenv("DRY_RUN")))
Expand Down
1 change: 1 addition & 0 deletions pkg/timelock/const_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
testFromBlock = big.NewInt(0)
testPollPeriod = 5
testEventListenerPollPeriod = 1
testEventListenerPollSize = uint64(10)
testDryRun = false
testLogger = lo.Must(logger.NewLogger("info", "human")).Sugar()
)
50 changes: 38 additions & 12 deletions pkg/timelock/timelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Worker struct {
fromBlock *big.Int
pollPeriod int64
listenerPollPeriod int64
pollSize uint64
dryRun bool
logger *zap.SugaredLogger
privateKey *ecdsa.PrivateKey
Expand All @@ -50,7 +51,7 @@ var validNodeUrlSchemes = []string{"http", "https", "ws", "wss"}
// It's a singleton, so further executions will retrieve the same timelockWorker.
func NewTimelockWorker(
nodeURL, timelockAddress, callProxyAddress, privateKey string, fromBlock *big.Int,
pollPeriod int64, listenerPollPeriod int64, dryRun bool, logger *zap.SugaredLogger,
pollPeriod int64, listenerPollPeriod int64, pollSize uint64, dryRun bool, logger *zap.SugaredLogger,
) (*Worker, error) {
// Sanity check on each provided variable before allocating more resources.
u, err := url.ParseRequestURI(nodeURL)
Expand Down Expand Up @@ -78,6 +79,10 @@ func NewTimelockWorker(
return nil, fmt.Errorf("event-listener-poll-period must be a positive non-zero integer: got %d", listenerPollPeriod)
}

if slices.Contains(httpSchemes, u.Scheme) && pollSize == 0 {
return nil, fmt.Errorf("event-listener-poll-size must be a positive non-zero integer: got %d", pollSize)
}

if fromBlock.Int64() < big.NewInt(0).Int64() {
return nil, fmt.Errorf("from block can't be a negative number (minimum value 0): got %d", fromBlock.Int64())
}
Expand Down Expand Up @@ -127,6 +132,7 @@ func NewTimelockWorker(
fromBlock: fromBlock,
pollPeriod: pollPeriod,
listenerPollPeriod: listenerPollPeriod,
pollSize: pollSize,
dryRun: dryRun,
logger: logger,
privateKey: privateKeyECDSA,
Expand Down Expand Up @@ -193,10 +199,11 @@ func (tw *Worker) Listen(ctx context.Context) error {
}

// setupFilterQuery returns an ethereum.FilterQuery initialized to watch the Timelock contract.
func (tw *Worker) setupFilterQuery(fromBlock *big.Int) ethereum.FilterQuery {
func (tw *Worker) setupFilterQuery(fromBlock, toBlock *big.Int) ethereum.FilterQuery {
return ethereum.FilterQuery{
Addresses: tw.address,
FromBlock: fromBlock,
ToBlock: toBlock,
}
}

Expand All @@ -216,7 +223,7 @@ func (tw *Worker) retrieveNewLogs(ctx context.Context) (<-chan struct{}, <-chan

// subscribeNewLogs subscribes to a Timelock contract and emit logs through the channel it returns.
func (tw *Worker) subscribeNewLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
query := tw.setupFilterQuery(tw.fromBlock)
query := tw.setupFilterQuery(tw.fromBlock, nil)
logCh := make(chan types.Log)
done := make(chan struct{})

Expand Down Expand Up @@ -291,7 +298,7 @@ func (tw *Worker) pollNewLogs(ctx context.Context) (<-chan struct{}, <-chan type
defer ticker.Stop()

for {
lastBlock = tw.fetchAndDispatchLogs(ctx, logCh, lastBlock)
lastBlock = tw.fetchAndDispatchLogs(ctx, logCh, lastBlock, nil)

select {
case <-ticker.C:
Expand All @@ -311,7 +318,7 @@ func (tw *Worker) pollNewLogs(ctx context.Context) (<-chan struct{}, <-chan type
// retrieveHistoricalLogs returns a types.Log channel and retrieves all the historical events of a given contract.
// Once all the logs have been sent into the channel the function returns and the channel is closed.
func (tw *Worker) retrieveHistoricalLogs(ctx context.Context) (<-chan struct{}, <-chan types.Log, error) {
query := tw.setupFilterQuery(tw.fromBlock)
query := tw.setupFilterQuery(tw.fromBlock, nil)
logCh := make(chan types.Log)
done := make(chan struct{})

Expand Down Expand Up @@ -352,30 +359,48 @@ func (tw *Worker) retrieveHistoricalLogs(ctx context.Context) (<-chan struct{},
return done, logCh, nil
}

func (tw *Worker) fetchAndDispatchLogs(ctx context.Context, logCh chan types.Log, lastBlock *big.Int) *big.Int {
query := tw.setupFilterQuery(lastBlock)
func (tw *Worker) fetchAndDispatchLogs(
ctx context.Context, logCh chan types.Log, fromBlock, currentChainBlock *big.Int,
) *big.Int {
if currentChainBlock == nil {
blockNumber, err := tw.ethClient.BlockNumber(ctx)
if err != nil {
tw.logger.With("error", err).Error("unable to fetch current block number from eth client")
}
currentChainBlock = new(big.Int).SetUint64(blockNumber)
}
toBlock := new(big.Int).SetUint64(min(currentChainBlock.Uint64(), fromBlock.Uint64()+tw.pollSize))

query := tw.setupFilterQuery(fromBlock, toBlock)
tw.logger.Debugf("fetching logs from block %v to block %v", query.FromBlock, query.ToBlock)
logs, err := tw.ethClient.FilterLogs(ctx, query)
if err != nil {
tw.logger.With("error", err).Error("unable to fetch logs from eth client")
SetReadyStatus(HealthStatusError) // FIXME(gustavogama-cll): wait for N errors before setting status

return lastBlock
return fromBlock
}
tw.logger.Debugf("fetched %d log entries starting from block %d", len(logs), lastBlock)

tw.logger.Debugf("fetched %d log entries from block %d to block %d", len(logs), fromBlock, toBlock)
SetReadyStatus(HealthStatusOK)

for _, log := range logs {
lastBlock = new(big.Int).SetUint64(max(lastBlock.Uint64(), log.BlockNumber+1))
select {
case logCh <- log:
tw.logger.With("log", log).Debug("dispatching log")
case <-ctx.Done():
tw.logger.Debug("stopped while dispatching logs: incomplete retrieval.")
break
return toBlock
}
}

return lastBlock
if toBlock.Cmp(currentChainBlock) < 0 {
// we haven't reached the current block; re-run same procedure with
// the 'toBlock` as the start block
return tw.fetchAndDispatchLogs(ctx, logCh, toBlock, currentChainBlock)
}

return toBlock
}

// processLogs is implemented as a fan-in for all the logs channels, merging all the data and handling logs sequentially.
Expand Down Expand Up @@ -512,4 +537,5 @@ func (tw *Worker) startLog() {
tw.logger.Infof("\tStarting from block: %v", tw.fromBlock)
tw.logger.Infof("\tPoll Period: %v", time.Duration(tw.pollPeriod*int64(time.Second)).String())
tw.logger.Infof("\tEvent Listener Poll Period: %v", time.Duration(tw.listenerPollPeriod*int64(time.Second)).String())
tw.logger.Infof("\tEvent Listener Poll # Logs%v", tw.pollSize)
}
17 changes: 13 additions & 4 deletions pkg/timelock/timelock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

func newTestTimelockWorker(
t *testing.T, nodeURL, timelockAddress, callProxyAddress, privateKey string, fromBlock *big.Int,
pollPeriod int64, eventListenerPollPeriod int64, dryRun bool, logger *zap.SugaredLogger,
pollPeriod int64, eventListenerPollPeriod int64, eventListenerPollSize uint64, dryRun bool,
logger *zap.SugaredLogger,
) *Worker {
assert.NotEmpty(t, nodeURL, "nodeURL is empty. Are environment variabes in const_test.go set?")
assert.NotEmpty(t, timelockAddress, "nodeURL is empty. Are environment variabes in const_test.go set?")
Expand All @@ -24,7 +25,7 @@ func newTestTimelockWorker(
assert.NotNil(t, logger, "logger is nil. Are environment variabes in const_test.go set?")

tw, err := NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey, fromBlock,
pollPeriod, eventListenerPollPeriod, dryRun, logger)
pollPeriod, eventListenerPollPeriod, eventListenerPollSize, dryRun, logger)
require.NoError(t, err)
require.NotNil(t, tw)

Expand All @@ -44,6 +45,7 @@ func TestNewTimelockWorker(t *testing.T) {
fromBlock *big.Int
pollPeriod int64
eventListenerPollPeriod int64
eventListenerPollSize uint64
dryRun bool
logger *zap.SugaredLogger
}
Expand All @@ -55,6 +57,7 @@ func TestNewTimelockWorker(t *testing.T) {
fromBlock: big.NewInt(1),
pollPeriod: 900,
eventListenerPollPeriod: 60,
eventListenerPollSize: 1000,
dryRun: false,
logger: zap.NewNop().Sugar(),
}
Expand Down Expand Up @@ -108,6 +111,11 @@ func TestNewTimelockWorker(t *testing.T) {
setup: func(a *argsT) { a.eventListenerPollPeriod = -1 },
wantErr: "event-listener-poll-period must be a positive non-zero integer: got -1",
},
{
name: "failure - bad event listener poll size",
setup: func(a *argsT) { a.eventListenerPollSize = 0 },
wantErr: "event-listener-poll-size must be a positive non-zero integer: got 0",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -118,7 +126,7 @@ func TestNewTimelockWorker(t *testing.T) {

got, err := NewTimelockWorker(args.nodeURL, args.timelockAddress, args.callProxyAddress,
args.privateKey, args.fromBlock, args.pollPeriod, args.eventListenerPollPeriod,
args.dryRun, args.logger)
args.eventListenerPollSize, args.dryRun, args.logger)

if tt.wantErr == "" {
require.NoError(t, err)
Expand All @@ -136,7 +144,8 @@ func TestWorker_startLog(t *testing.T) {
rpcURL := runRPCServer(t)

testWorker := newTestTimelockWorker(t, rpcURL, testTimelockAddress, testCallProxyAddress, testPrivateKey,
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testDryRun, testLogger)
testFromBlock, int64(testPollPeriod), int64(testEventListenerPollPeriod), testEventListenerPollSize,
testDryRun, testLogger)

tests := []struct {
name string
Expand Down
59 changes: 51 additions & 8 deletions tests/integration/timelock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integration
import (
"context"
"math/big"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -65,7 +66,7 @@ func (s *integrationTestSuite) TestTimelockWorkerListen() {
callProxyAddress, _, _, _ := DeployCallProxy(s.T(), ctx, transactor, backend, timelockAddress)

go runTimelockWorker(s.T(), sctx, tt.url, timelockAddress.String(), callProxyAddress.String(),
account.HexPrivateKey, big.NewInt(0), int64(60), int64(1), true, logger)
account.HexPrivateKey, big.NewInt(0), int64(60), int64(1), uint64(10), true, logger)

UpdateDelay(s.T(), ctx, transactor, backend, timelockContract, big.NewInt(10))

Expand Down Expand Up @@ -123,7 +124,7 @@ func (s *integrationTestSuite) TestTimelockWorkerDryRun() {
s.Require().EventuallyWithT(func(t *assert.CollectT) {
assertLogMessage(t, logs, "scheduling operation: 371141ec10c0cc52996bed94240931136172d0b46bdc4bceaea1ef76675c1237")
assertLogMessage(t, logs, "scheduled operation: 371141ec10c0cc52996bed94240931136172d0b46bdc4bceaea1ef76675c1237")
}, 2*time.Second, 100*time.Millisecond)
}, 2*time.Second, 100*time.Millisecond, logMessages(logs))
},
},
}
Expand All @@ -139,7 +140,7 @@ func (s *integrationTestSuite) TestTimelockWorkerDryRun() {
callProxyAddress, _, _, _ := DeployCallProxy(s.T(), tctx, transactor, backend, timelockAddress)

go runTimelockWorker(s.T(), tctx, gethURL, timelockAddress.String(), callProxyAddress.String(),
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), tt.dryRun, logger)
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), uint64(10), tt.dryRun, logger)

ScheduleBatch(s.T(), tctx, transactor, backend, timelockContract, calls, [32]byte{}, [32]byte{}, big.NewInt(1))

Expand Down Expand Up @@ -168,7 +169,7 @@ func (s *integrationTestSuite) TestTimelockWorkerCancelledEvent() {
callProxyAddress, _, _, _ := DeployCallProxy(s.T(), ctx, transactor, backend, timelockAddress)

go runTimelockWorker(s.T(), ctx, gethURL, timelockAddress.String(), callProxyAddress.String(),
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), false, logger)
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), uint64(10), false, logger)

calls := []contracts.RBACTimelockCall{{
Target: common.HexToAddress("0x000000000000000000000000000000000000000"),
Expand All @@ -189,16 +190,50 @@ func (s *integrationTestSuite) TestTimelockWorkerCancelledEvent() {
assertLogMessage(s.T(), logs, "de-scheduled operation: 371141ec10c0cc52996bed94240931136172d0b46bdc4bceaea1ef76675c1237")
}

func (s *integrationTestSuite) TestTimelockWorkerPollSize() {
// --- arrange ---
ctx, cancel := context.WithCancel(s.Ctx)
defer cancel()

account := NewTestAccount(s.T())
_, err := s.GethContainer.CreateAccount(ctx, account.HexAddress, account.HexPrivateKey, 1)
s.Require().NoError(err)
s.Logf("new account created: %v", account)

gethURL := s.GethContainer.HTTPConnStr(s.T(), ctx)
backend := NewRPCBackend(s.T(), ctx, gethURL)
transactor := s.KeyedTransactor(account.PrivateKey, nil)
logger, logs := timelockTests.NewTestLogger()

timelockAddress, _, _, _ := DeployTimelock(s.T(), ctx, transactor, backend,
account.Address, big.NewInt(1))
callProxyAddress, _, _, _ := DeployCallProxy(s.T(), ctx, transactor, backend, timelockAddress)

time.Sleep(1*time.Second) // wait for a few blocks before starting the timelock worker service

// --- act ---
go runTimelockWorker(s.T(), ctx, gethURL, timelockAddress.String(), callProxyAddress.String(),
account.HexPrivateKey, big.NewInt(0), int64(1), int64(1), uint64(2), false, logger)

// --- assert ---
s.Require().EventuallyWithT(func(collect *assert.CollectT) {
assertLogMessage(collect, logs, "fetching logs from block 0 to block 2")
assertLogMessage(collect, logs, "fetching logs from block 2 to block 4")
assertLogMessage(collect, logs, "fetching logs from block 4 to block 6")
}, 2*time.Second, 100*time.Millisecond, logMessages(logs))
}

// ----- helpers -----

func runTimelockWorker(
t *testing.T, ctx context.Context, nodeURL, timelockAddress, callProxyAddress, privateKey string,
fromBlock *big.Int, pollPeriod int64, listenerPollPeriod int64, dryRun bool, logger *zap.Logger,
fromBlock *big.Int, pollPeriod int64, listenerPollPeriod int64, listenerPollSize uint64,
dryRun bool, logger *zap.Logger,
) {
t.Logf("TimelockWorker.Listen(%v, %v, %v, %v, %v, %v, %v)", nodeURL, timelockAddress,
callProxyAddress, privateKey, fromBlock, pollPeriod, listenerPollPeriod)
t.Logf("TimelockWorker.Listen(%v, %v, %v, %v, %v, %v, %v, %v)", nodeURL, timelockAddress,
callProxyAddress, privateKey, fromBlock, pollPeriod, listenerPollPeriod, listenerPollSize)
timelockWorker, err := timelock.NewTimelockWorker(nodeURL, timelockAddress,
callProxyAddress, privateKey, fromBlock, pollPeriod, listenerPollPeriod, dryRun, logger.Sugar())
callProxyAddress, privateKey, fromBlock, pollPeriod, listenerPollPeriod, listenerPollSize, dryRun, logger.Sugar())
require.NoError(t, err)
require.NotNil(t, timelockWorker)

Expand All @@ -209,3 +244,11 @@ func runTimelockWorker(
func assertLogMessage(t assert.TestingT, logs *observer.ObservedLogs, message string) {
assert.Equal(t, logs.FilterMessage(message).Len(), 1)
}

func logMessages(logs *observer.ObservedLogs) string {
m := make([]string, 0, logs.Len())
for _, entry := range logs.All() {
m = append(m, entry.Message)
}
return "LOGS:\n" + strings.Join(m, "\n")
}

0 comments on commit f3a54ee

Please sign in to comment.