Skip to content

Commit

Permalink
Rework Reconcile scheduled sync
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelattwood committed Jan 16, 2025
1 parent 6c2b9df commit 8686e27
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 16 deletions.
12 changes: 4 additions & 8 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
klog "k8s.io/klog/v2"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
)

Expand Down Expand Up @@ -111,10 +110,10 @@ func run() error {
}

controllerCfg := &controller.Config{
ReadOnly: *readOnly,
Namespace: *namespace,
CacheDir: *cacheDir,
SyncInterval: *controlLoopSyncInterval,
ReadOnly: *readOnly,
Namespace: *namespace,
CacheDir: *cacheDir,
RequeueInterval: *controlLoopSyncInterval,
}

return runControlLoop(config, natsCfg, controllerCfg)
Expand Down Expand Up @@ -171,9 +170,6 @@ func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, control
mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
Logger: klog.NewKlogr().WithName("controller-runtime"),
Cache: cache.Options{
SyncPeriod: &controllerCfg.SyncInterval,
},
})
if err != nil {
return fmt.Errorf("unable to start manager: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
}
return ctrl.Result{}, nil

return ctrl.Result{RequeueAfter: r.RequeueInterval()}, nil
}

func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger, consumer *api.Consumer) error {
Expand Down
15 changes: 15 additions & 0 deletions internal/controller/jetstream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"fmt"
"math/rand/v2"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -34,6 +35,8 @@ type JetStreamController interface {
//
// Returns the error of the operation or errors during client setup.
WithJetStreamClient(opts api.ConnectionOpts, ns string, op func(js jetstream.JetStream) error) error

RequeueInterval() time.Duration
}

func NewJSController(k8sClient client.Client, natsConfig *NatsConfig, controllerConfig *Config) (JetStreamController, error) {
Expand All @@ -52,6 +55,18 @@ type jsController struct {
cacheDir string
}

func (c *jsController) RequeueInterval() time.Duration {
// Stagger the requeue slightly
interval := c.controllerConfig.RequeueInterval

// Allow up to a 10% variance
intervalRange := float64(interval.Nanoseconds()) * 0.1

randomFactor := (rand.Float64() * 2) - 1.0

return time.Duration(float64(interval.Nanoseconds()) + (intervalRange * randomFactor))
}

func (c *jsController) ReadOnly() bool {
return c.controllerConfig.ReadOnly
}
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/keyvalue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (r *KeyValueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err := r.createOrUpdate(ctx, log, keyValue); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
}
return ctrl.Result{}, nil

return ctrl.Result{RequeueAfter: r.RequeueInterval()}, nil
}

func (r *KeyValueReconciler) deleteKeyValue(ctx context.Context, log logr.Logger, keyValue *api.KeyValue) error {
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/objectstore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (r *ObjectStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err := r.createOrUpdate(ctx, log, objectStore); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
}
return ctrl.Result{}, nil

return ctrl.Result{RequeueAfter: r.RequeueInterval()}, nil
}

func (r *ObjectStoreReconciler) deleteObjectStore(ctx context.Context, log logr.Logger, objectStore *api.ObjectStore) error {
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
//
// Namespace restricts the controller to resources of the given namespace.
type Config struct {
ReadOnly bool
Namespace string
SyncInterval time.Duration
CacheDir string
ReadOnly bool
Namespace string
RequeueInterval time.Duration
CacheDir string
}

// RegisterAll registers all available jetStream controllers to the manager.
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/stream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
if err := r.createOrUpdate(ctx, log, stream); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
}
return ctrl.Result{}, nil

return ctrl.Result{RequeueAfter: r.RequeueInterval()}, nil
}

func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, stream *api.Stream) error {
Expand Down

0 comments on commit 8686e27

Please sign in to comment.