From 180099abecbfdab2c421d55186f4fdd5b5ca284f Mon Sep 17 00:00:00 2001 From: Samuel Attwood <45669855+samuelattwood@users.noreply.github.com> Date: Thu, 16 Jan 2025 02:06:25 -0500 Subject: [PATCH] Add Account Controller (#224) * Add explicit Scheme field to Reconciler structs to better match convention * Update spec to assume consumer field 'Name' over deprecated 'DurableName' * Add account controller and support for account CRD auth config * Update CRDs. Slight change to connection options * Namespace and error handling improvements * Improve deleted log message * Namespace fix * Add check for removed Push Consumer options * Revert consumer name change * Add InactiveThreshold parsing * Fix test * Add Watch for Account resource changes to trigger reconcile of dependent resources. Improve connection opts handling * Add actual/desired state comparison for stream/consumer to avoid unnecessary update calls. Corrected ready state * Fix duplicate resource tracking * Improve config comparison logic * Add flags for sync interval and cache directory * Add back generation changed filter. Move finalizer add to after deletion check. * Rework Reconcile scheduled sync --- cmd/jetstream-controller/main.go | 25 +- deploy/crds.yml | 124 ++++--- internal/controller/account_controller.go | 191 ++++++++++- .../controller/account_controller_test.go | 171 +++++++-- internal/controller/client.go | 35 +- internal/controller/consumer_controller.go | 205 ++++++++--- .../controller/consumer_controller_test.go | 38 +- internal/controller/jetstream_controller.go | 220 +++++++++--- internal/controller/keyvalue_controller.go | 183 +++++++--- .../controller/keyvalue_controller_test.go | 38 +- internal/controller/objectstore_controller.go | 187 +++++++--- .../controller/objectstore_controller_test.go | 38 +- internal/controller/register.go | 32 +- internal/controller/stream_controller.go | 181 +++++++--- internal/controller/stream_controller_test.go | 38 +- internal/controller/suite_test.go | 8 +- internal/controller/types.go | 11 + .../apis/jetstream/v1beta2/consumertypes.go | 50 +-- pkg/jetstream/apis/jetstream/v1beta2/types.go | 23 +- .../v1beta2/zz_generated.deepcopy.go | 44 ++- .../jetstream/v1beta2/basestreamconfig.go | 51 +-- .../jetstream/v1beta2/connectionopts.go | 83 +++++ .../jetstream/v1beta2/consumerspec.go | 324 ++++++++---------- .../generated/applyconfiguration/utils.go | 2 + 24 files changed, 1571 insertions(+), 731 deletions(-) create mode 100644 pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/connectionopts.go diff --git a/cmd/jetstream-controller/main.go b/cmd/jetstream-controller/main.go index 32678f32..e58ff7ce 100644 --- a/cmd/jetstream-controller/main.go +++ b/cmd/jetstream-controller/main.go @@ -25,7 +25,7 @@ import ( "github.com/nats-io/nack/controllers/jetstream" "github.com/nats-io/nack/internal/controller" - jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + v1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" clientset "github.com/nats-io/nack/pkg/jetstream/generated/clientset/versioned" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -67,10 +67,12 @@ func run() error { ca := flag.String("tlsca", "", "NATS TLS certificate authority chain") tlsfirst := flag.Bool("tlsfirst", false, "If enabled, forces explicit TLS without waiting for Server INFO") server := flag.String("s", "", "NATS Server URL") - crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config") + crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config. Ignored if running with control loop, CRD options will always override global config") cleanupPeriod := flag.Duration("cleanup-period", 30*time.Second, "Period to run object cleanup") readOnly := flag.Bool("read-only", false, "Starts the controller without causing changes to the NATS resources") + cacheDir := flag.String("cache-dir", "", "Directory to store cached credential and TLS files") controlLoop := flag.Bool("control-loop", false, "Experimental: Run controller with a full reconciliation control loop.") + controlLoopSyncInterval := flag.Duration("sync-interval", 5*time.Minute, "Interval to perform scheduled reconcile") flag.Parse() @@ -108,9 +110,12 @@ func run() error { } controllerCfg := &controller.Config{ - ReadOnly: *readOnly, - Namespace: *namespace, + ReadOnly: *readOnly, + Namespace: *namespace, + CacheDir: *cacheDir, + RequeueInterval: *controlLoopSyncInterval, } + return runControlLoop(config, natsCfg, controllerCfg) } @@ -160,17 +165,25 @@ func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, control // Setup scheme scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(jetstreamnatsiov1beta2.AddToScheme(scheme)) + utilruntime.Must(v1beta2.AddToScheme(scheme)) mgr, err := ctrl.NewManager(config, ctrl.Options{ Scheme: scheme, Logger: klog.NewKlogr().WithName("controller-runtime"), - // TODO Add full configuration }) if err != nil { return fmt.Errorf("unable to start manager: %w", err) } + if controllerCfg.CacheDir == "" { + cacheDir, err := os.MkdirTemp(".", "nack") + if err != nil { + return fmt.Errorf("create cache dir: %w", err) + } + defer os.RemoveAll(cacheDir) + controllerCfg.CacheDir = cacheDir + } + err = controller.RegisterAll(mgr, natsCfg, controllerCfg) if err != nil { return fmt.Errorf("register jetstream controllers: %w", err) diff --git a/deploy/crds.yml b/deploy/crds.yml index 17cdc203..055c1dcb 100644 --- a/deploy/crds.yml +++ b/deploy/crds.yml @@ -287,6 +287,10 @@ spec: type: array items: type: string + tlsFirst: + description: When true, the KV Store will iniate TLS before server INFO. + type: boolean + default: false status: type: object properties: @@ -513,6 +517,11 @@ spec: spec: type: object properties: + durableName: + description: The name of the Consumer. + type: string + pattern: '^[^.*>]+$' + minLength: 1 streamName: description: The name of the Stream to create the Consumer in. type: string @@ -533,11 +542,6 @@ spec: optStartTime: description: Time format must be RFC3339. type: string - durableName: - description: The name of the Consumer. - type: string - pattern: '^[^.*>]+$' - minLength: 1 deliverSubject: description: The subject to deliver observed messages, when not set, a pull-based Consumer is created. type: string @@ -626,6 +630,32 @@ spec: type: object additionalProperties: type: string + account: + description: Name of the account to which the Consumer belongs. + type: string + pattern: '^[^.*>]*$' + creds: + description: NATS user credentials for connecting to servers. Please make sure your controller has mounted the cerds on its path. + type: string + default: '' + nkey: + description: NATS user NKey for connecting to servers. + type: string + default: '' + preventDelete: + description: When true, the managed Consumer will not be deleted when the resource is deleted + type: boolean + default: false + preventUpdate: + description: When true, the managed Consumer will not be updated when the resource is updated + type: boolean + default: false + servers: + description: A list of servers for creating consumer + type: array + items: + type: string + default: [] tls: description: A client's TLS certs and keys. type: object @@ -641,30 +671,8 @@ spec: type: array items: type: string - servers: - description: A list of servers for creating consumer - type: array - items: - type: string - default: [] - creds: - description: NATS user credentials for connecting to servers. Please make sure your controller has mounted the cerds on its path. - type: string - default: '' - nkey: - description: NATS user NKey for connecting to servers. - type: string - default: '' - account: - description: Name of the account to which the Consumer belongs. - type: string - pattern: '^[^.*>]*$' - preventDelete: - description: When true, the managed Consumer will not be deleted when the resource is deleted - type: boolean - default: false - preventUpdate: - description: When true, the managed Consumer will not be updated when the resource is updated + tlsFirst: + description: When true, the KV Store will iniate TLS before server INFO. type: boolean default: false status: @@ -991,6 +999,19 @@ spec: type: string pattern: '^[^.*>]*$' minLength: 1 + creds: + description: The creds to be used to connect to the NATS Service. + type: object + properties: + secret: + type: object + properties: + name: + description: Name of the secret with the creds. + type: string + file: + description: Credentials file, generated with github.com/nats-io/nsc tool. + type: string servers: description: A list of servers to connect. type: array @@ -1017,19 +1038,6 @@ spec: key: description: Filename of the TLS cert key. type: string - creds: - description: The creds to be used to connect to the NATS Service. - type: object - properties: - secret: - type: object - properties: - name: - description: Name of the secret with the creds. - type: string - file: - description: Credentials file, generated with github.com/nats-io/nsc tool. - type: string token: description: The token to be used to connect to the NATS Service. type: object @@ -1059,6 +1067,30 @@ spec: password: description: Key in the secret that contains the password. type: string + tlsFirst: + description: When true, the KV Store will iniate TLS before server INFO. + type: boolean + default: false + status: + type: object + properties: + observedGeneration: + type: integer + conditions: + type: array + items: + type: object + properties: + type: + type: string + status: + type: string + lastTransitionTime: + type: string + reason: + type: string + message: + type: string --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition @@ -1241,6 +1273,10 @@ spec: type: array items: type: string + tlsFirst: + description: When true, the KV Store will iniate TLS before server INFO. + type: boolean + default: false status: type: object properties: @@ -1377,6 +1413,10 @@ spec: type: array items: type: string + tlsFirst: + description: When true, the KV Store will iniate TLS before server INFO. + type: boolean + default: false status: type: object properties: diff --git a/internal/controller/account_controller.go b/internal/controller/account_controller.go index 0be62eeb..2bcea621 100644 --- a/internal/controller/account_controller.go +++ b/internal/controller/account_controller.go @@ -1,5 +1,5 @@ /* -Copyright 2024. +Copyright 2025. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,35 +18,210 @@ package controller import ( "context" + "errors" + "fmt" + "github.com/go-logr/logr" + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - - jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" ) // AccountReconciler reconciles a Account object type AccountReconciler struct { + Scheme *runtime.Scheme JetStreamController } +type JetStreamResource interface { + GetName() string + GetNamespace() string +} + +type JetStreamResourceList []JetStreamResource + // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile +// It performs two main operations: +// - Initialize finalizer if not present +// - Remove the finalizer on deletion once no other resources are referencing the account +// +// A call to reconcile may perform only one action, expecting the reconciliation to be triggered again by an update. +// For example: Setting the finalizer triggers a second reconciliation. Reconcile returns after setting the finalizer, +// to prevent parallel reconciliations performing the same steps. func (r *AccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := klog.FromContext(ctx) - log.Info("reconcile", "namespace", req.Namespace, "name", req.Name) - // TODO(user): your logic here + if ok := r.ValidNamespace(req.Namespace); !ok { + log.Info("Controller restricted to namespace, skipping reconciliation.") + return ctrl.Result{}, nil + } + + // Fetch Account resource + account := &api.Account{} + if err := r.Get(ctx, req.NamespacedName, account); err != nil { + if apierrors.IsNotFound(err) { + log.Info("Account deleted.", "accountName", req.NamespacedName.String()) + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("get account resource '%s': %w", req.NamespacedName.String(), err) + } + + // Update ready status to unknown when no status is set + if len(account.Status.Conditions) == 0 { + log.Info("Setting initial ready condition to unknown.") + account.Status.Conditions = updateReadyCondition(account.Status.Conditions, v1.ConditionUnknown, stateReconciling, "Starting reconciliation") + err := r.Status().Update(ctx, account) + if err != nil { + return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err) + } + r.Get(ctx, req.NamespacedName, account) + log.Info("Status", "Conditions", account.Status.Conditions) + return ctrl.Result{Requeue: true}, nil + } + + // Check Deletion + markedForDeletion := account.GetDeletionTimestamp() != nil + if markedForDeletion { + if controllerutil.ContainsFinalizer(account, accountFinalizer) { + // Get list of resources referencing this account + requests := r.findDependentResources(ctx, log, account) + if len(requests) > 0 { + log.Info("Account still has dependent resources, cannot delete", "dependentCount", len(requests)) + account.Status.Conditions = updateReadyCondition( + account.Status.Conditions, + v1.ConditionFalse, + stateFinalizing, + "Account has dependent resources that must be deleted first", + ) + if err := r.Status().Update(ctx, account); err != nil { + return ctrl.Result{}, fmt.Errorf("update status: %w", err) + } + return ctrl.Result{Requeue: true}, nil + } + + log.Info("Removing Account finalizer") + if ok := controllerutil.RemoveFinalizer(account, accountFinalizer); !ok { + return ctrl.Result{}, errors.New("failed to remove finalizer") + } + if err := r.Update(ctx, account); err != nil { + return ctrl.Result{}, fmt.Errorf("remove finalizer: %w", err) + } + } else { + log.Info("Account marked for deletion and already finalized. Ignoring.") + } + return ctrl.Result{}, nil + } + + // Add finalizer + if !controllerutil.ContainsFinalizer(account, accountFinalizer) { + log.Info("Adding Account finalizer.") + if ok := controllerutil.AddFinalizer(account, accountFinalizer); !ok { + return ctrl.Result{}, errors.New("failed to add finalizer to account resource") + } + + if err := r.Update(ctx, account); err != nil { + return ctrl.Result{}, fmt.Errorf("update account resource to add finalizer: %w", err) + } + return ctrl.Result{}, nil + } + + // Update ready status for non-deleted accounts + account.Status.ObservedGeneration = account.Generation + account.Status.Conditions = updateReadyCondition( + account.Status.Conditions, + v1.ConditionTrue, + stateReady, + "Account is ready", + ) + if err := r.Status().Update(ctx, account); err != nil { + return ctrl.Result{}, fmt.Errorf("update status: %w", err) + } return ctrl.Result{}, nil } +func (r *AccountReconciler) findDependentResources(ctx context.Context, log logr.Logger, obj client.Object) []reconcile.Request { + var resourceList JetStreamResourceList + + var consumerList api.ConsumerList + if err := r.List(ctx, &consumerList, + client.InNamespace(obj.GetNamespace()), + ); err != nil { + log.Error(err, "Failed to list consumers") + } + for _, i := range consumerList.Items { + if i.Spec.Account == obj.GetName() { + resourceList = append(resourceList, &i) + } + } + + var keyValueList api.KeyValueList + if err := r.List(ctx, &keyValueList, + client.InNamespace(obj.GetNamespace()), + ); err != nil { + log.Error(err, "Failed to list accounts") + } + for _, i := range keyValueList.Items { + if i.Spec.Account == obj.GetName() { + resourceList = append(resourceList, &i) + } + } + + var objectStoreList api.ObjectStoreList + if err := r.List(ctx, &objectStoreList, + client.InNamespace(obj.GetNamespace()), + ); err != nil { + log.Error(err, "Failed to list objectstores") + } + for _, i := range objectStoreList.Items { + if i.Spec.Account == obj.GetName() { + resourceList = append(resourceList, &i) + } + } + + var streamList api.StreamList + if err := r.List(ctx, &streamList, + client.InNamespace(obj.GetNamespace()), + ); err != nil { + log.Error(err, "Failed to list streams") + } + for _, i := range streamList.Items { + if i.Spec.Account == obj.GetName() { + resourceList = append(resourceList, &i) + } + } + + requests := make([]reconcile.Request, 0, len(resourceList)) + for _, resource := range resourceList { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: resource.GetNamespace(), + Name: resource.GetName(), + }, + }) + } + + return requests +} + // SetupWithManager sets up the controller with the Manager. func (r *AccountReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&jetstreamnatsiov1beta2.Account{}). + For(&api.Account{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). Complete(r) } diff --git a/internal/controller/account_controller_test.go b/internal/controller/account_controller_test.go index 46debade..2067b758 100644 --- a/internal/controller/account_controller_test.go +++ b/internal/controller/account_controller_test.go @@ -17,67 +17,164 @@ limitations under the License. package controller import ( - "context" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/reconcile" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" ) var _ = Describe("Account Controller", func() { Context("When reconciling a resource", func() { - const resourceName = "test-resource" - - ctx := context.Background() + const resourceName = "test-account" typeNamespacedName := types.NamespacedName{ Name: resourceName, - Namespace: "default", // TODO(user):Modify as needed + Namespace: "default", } - account := &jetstreamnatsiov1beta2.Account{} + account := &api.Account{} + + // Tested controller + var controller *AccountReconciler + + BeforeEach(func(ctx SpecContext) { + controller = &AccountReconciler{ + Scheme: k8sClient.Scheme(), + JetStreamController: baseController, + } + }) + + When("the resource is marked for deletion", func() { + var stream *api.Stream + var streamName types.NamespacedName + + BeforeEach(func(ctx SpecContext) { + By("creating the custom resource for the Kind Account") + err := k8sClient.Get(ctx, typeNamespacedName, account) + if err != nil && k8serrors.IsNotFound(err) { + resource := &api.Account{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: "default", + }, + Spec: api.AccountSpec{ + Servers: []string{"nats://nats.io"}, + }, + } + + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + Expect(k8sClient.Get(ctx, typeNamespacedName, account)).To(Succeed()) - BeforeEach(func() { - By("creating the custom resource for the Kind Account") - err := k8sClient.Get(ctx, typeNamespacedName, account) - if err != nil && errors.IsNotFound(err) { - resource := &jetstreamnatsiov1beta2.Account{ + controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + + Expect(k8sClient.Get(ctx, typeNamespacedName, account)).To(Succeed()) + Expect(controllerutil.ContainsFinalizer(account, accountFinalizer)).To(BeTrue()) + } + + By("creating a dependent stream resource") + stream = &api.Stream{ ObjectMeta: metav1.ObjectMeta{ - Name: resourceName, + Name: "test-stream", Namespace: "default", }, - // TODO(user): Specify other spec details if needed. + Spec: api.StreamSpec{ + Name: "test-stream", + Replicas: 1, + Discard: "old", + Storage: "file", + Retention: "workqueue", + BaseStreamConfig: api.BaseStreamConfig{ + ConnectionOpts: api.ConnectionOpts{ + Account: resourceName, + }, + }, + }, } - Expect(k8sClient.Create(ctx, resource)).To(Succeed()) - } - }) + streamName = types.NamespacedName{ + Name: stream.Name, + Namespace: stream.Namespace, + } + Expect(k8sClient.Create(ctx, stream)).To(Succeed()) - AfterEach(func() { - // TODO(user): Cleanup logic after each test, like removing the resource instance. - resource := &jetstreamnatsiov1beta2.Account{} - err := k8sClient.Get(ctx, typeNamespacedName, resource) - Expect(err).NotTo(HaveOccurred()) + By("marking the account for deletion") + Expect(k8sClient.Delete(ctx, account)).To(Succeed()) + Expect(k8sClient.Get(ctx, typeNamespacedName, account)).To(Succeed()) + }) - By("Cleanup the specific resource instance Account") - Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) - }) - It("should successfully reconcile the resource", func() { - By("Reconciling the created resource") - controllerReconciler := &AccountReconciler{ - baseController, - } + AfterEach(func(ctx SpecContext) { + By("cleaning up the stream") + stream := &api.Stream{} + err := k8sClient.Get(ctx, streamName, stream) + if err == nil { + Expect(k8sClient.Delete(ctx, stream)).To(Succeed()) + } + + By("removing the account resource") + controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(k8sClient.Get(ctx, typeNamespacedName, account)).To(Not(Succeed())) + }) + + It("should not remove finalizer while dependent resources exist", func(ctx SpecContext) { + By("reconciling the deletion") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + + By("checking the account still exists") + Expect(k8sClient.Get(ctx, typeNamespacedName, account)).To(Succeed()) + Expect(account.Finalizers).To(ContainElement(accountFinalizer)) + + By("verifying the ready condition is set to false") + Expect(account.Status.Conditions).To(HaveLen(1)) + assertReadyStateMatches( + account.Status.Conditions[0], + v1.ConditionFalse, + stateFinalizing, + "Account has dependent resources that must be deleted first", + time.Now(), + ) + }) + + It("should remove finalizer after dependent resources are removed", func(ctx SpecContext) { + By("removing the dependent stream") + Expect(k8sClient.Delete(ctx, stream)).To(Succeed()) + + By("reconciling the deletion") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) + + By("checking the account is deleted") + err = k8sClient.Get(ctx, typeNamespacedName, account) + Expect(err).To(HaveOccurred()) + Expect(k8serrors.IsNotFound(err)).To(BeTrue()) + }) + + It("should remove finalizer after dependent resources are updated", func(ctx SpecContext) { + By("updating the dependent stream to remove account reference") + Expect(k8sClient.Get(ctx, streamName, stream)).To(Succeed()) + stream.Spec.Account = "" + Expect(k8sClient.Update(ctx, stream)).To(Succeed()) + + By("reconciling the deletion") + result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.IsZero()).To(BeTrue()) - _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ - NamespacedName: typeNamespacedName, + By("checking the account is deleted") + err = k8sClient.Get(ctx, typeNamespacedName, account) + Expect(err).To(HaveOccurred()) + Expect(k8serrors.IsNotFound(err)).To(BeTrue()) }) - Expect(err).NotTo(HaveOccurred()) - // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. - // Example: If you expect a certain status condition after reconciliation, verify it here. }) }) }) diff --git a/internal/controller/client.go b/internal/controller/client.go index 5eb9b8d6..7e79a658 100644 --- a/internal/controller/client.go +++ b/internal/controller/client.go @@ -23,6 +23,10 @@ type NatsConfig struct { func (o *NatsConfig) buildOptions() ([]nats.Option, error) { opts := make([]nats.Option, 0) + if o.ServerURL == "" { + return nil, fmt.Errorf("server url is required") + } + if o.TLSFirst { opts = append(opts, nats.TLSHandshakeFirst()) } @@ -31,25 +35,24 @@ func (o *NatsConfig) buildOptions() ([]nats.Option, error) { opts = append(opts, nats.Name(o.ClientName)) } - if !o.CRDConnect { - // Use JWT/NKEYS based credentials if present. - if o.Credentials != "" { - opts = append(opts, nats.UserCredentials(o.Credentials)) - } else if o.NKey != "" { - opt, err := nats.NkeyOptionFromSeed(o.NKey) - if err != nil { - return nil, fmt.Errorf("nkey option from seed: %w", err) - } - opts = append(opts, opt) - } + if o.Credentials != "" { + opts = append(opts, nats.UserCredentials(o.Credentials)) + } - if o.Certificate != "" && o.Key != "" { - opts = append(opts, nats.ClientCert(o.Certificate, o.Key)) + if o.NKey != "" { + opt, err := nats.NkeyOptionFromSeed(o.NKey) + if err != nil { + return nil, fmt.Errorf("nkey option from seed: %w", err) } + opts = append(opts, opt) + } - if len(o.CAs) > 0 { - opts = append(opts, nats.RootCAs(o.CAs...)) - } + if o.Certificate != "" && o.Key != "" { + opts = append(opts, nats.ClientCert(o.Certificate, o.Key)) + } + + if len(o.CAs) > 0 { + opts = append(opts, nats.RootCAs(o.CAs...)) } return opts, nil diff --git a/internal/controller/consumer_controller.go b/internal/controller/consumer_controller.go index 67540458..21efa23f 100644 --- a/internal/controller/consumer_controller.go +++ b/internal/controller/consumer_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "encoding/json" "errors" "fmt" "time" @@ -26,7 +27,10 @@ import ( "github.com/nats-io/nats.go/jetstream" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -36,6 +40,8 @@ import ( // ConsumerReconciler reconciles a Consumer object type ConsumerReconciler struct { + Scheme *runtime.Scheme + JetStreamController } @@ -56,7 +62,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c consumer := &api.Consumer{} if err := r.Get(ctx, req.NamespacedName, consumer); err != nil { if apierrors.IsNotFound(err) { - log.Info("Consumer resource not found. Ignoring since object must be deleted.") + log.Info("Consumer deleted.", "consumerName", req.NamespacedName.String()) return ctrl.Result{}, nil } return ctrl.Result{}, fmt.Errorf("get consumer resource '%s': %w", req.NamespacedName.String(), err) @@ -70,7 +76,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // Update ready status to unknown when no status is set if len(consumer.Status.Conditions) == 0 { log.Info("Setting initial ready condition to unknown.") - consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation") + consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionUnknown, stateReconciling, "Starting reconciliation") err := r.Status().Update(ctx, consumer) if err != nil { return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err) @@ -78,6 +84,21 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{Requeue: true}, nil } + // Check Deletion + markedForDeletion := consumer.GetDeletionTimestamp() != nil + if markedForDeletion { + if controllerutil.ContainsFinalizer(consumer, consumerFinalizer) { + err := r.deleteConsumer(ctx, log, consumer) + if err != nil { + return ctrl.Result{}, fmt.Errorf("delete consumer: %w", err) + } + } else { + log.Info("Consumer marked for deletion and already finalized. Ignoring.") + } + + return ctrl.Result{}, nil + } + // Add finalizer if !controllerutil.ContainsFinalizer(consumer, consumerFinalizer) { log.Info("Adding consumer finalizer.") @@ -91,37 +112,52 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, nil } - // Check Deletion - markedForDeletion := consumer.GetDeletionTimestamp() != nil - if markedForDeletion { - if controllerutil.ContainsFinalizer(consumer, consumerFinalizer) { - err := r.deleteConsumer(ctx, log, consumer) - if err != nil { - return ctrl.Result{}, fmt.Errorf("delete consumer: %w", err) - } - } else { - log.Info("Consumer marked for deletion and already finalized. Ignoring.") + if consumer.Spec.FlowControl || consumer.Spec.DeliverSubject != "" || consumer.Spec.DeliverGroup != "" || consumer.Spec.HeartbeatInterval != "" { + log.Info("FlowControl, DeliverSubject, DeliverGroup, and HeartbeatInterval are Push Consumer options, which are not supported. Skipping consumer creation or update.") + consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, stateErrored, "Push Consumer options are not supported.") + if err := r.Status().Update(ctx, consumer); err != nil { + log.Error(err, "Failed to update ready condition to Errored.") } - return ctrl.Result{}, nil } // Create or update stream if err := r.createOrUpdate(ctx, log, consumer); err != nil { + if err := r.Get(ctx, client.ObjectKeyFromObject(consumer), consumer); err != nil { + return ctrl.Result{}, fmt.Errorf("get consumer resource: %w", err) + } + consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, stateErrored, err.Error()) + if err := r.Status().Update(ctx, consumer); err != nil { + log.Error(err, "Failed to update ready condition to Errored.") + } return ctrl.Result{}, fmt.Errorf("create or update: %s", err) } - return ctrl.Result{}, nil + + return ctrl.Result{RequeueAfter: r.RequeueInterval()}, nil } func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger, consumer *api.Consumer) error { // Set status to false - consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.") + consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, stateFinalizing, "Performing finalizer operations.") if err := r.Status().Update(ctx, consumer); err != nil { return fmt.Errorf("update ready condition: %w", err) } + storedState, err := getStoredConsumerState(consumer) + if err != nil { + log.Error(err, "Failed to fetch stored state.") + } + if !consumer.Spec.PreventDelete && !r.ReadOnly() { - err := r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error { + err := r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error { + _, err := getServerConsumerState(ctx, js, consumer) + // If we have no known state for this consumer it has never been reconciled. + // If we are also receiving an error fetching state, either the consumer does not exist + // or this resource config is invalid. + if err != nil && storedState == nil { + return nil + } + return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName) }) switch { @@ -130,7 +166,11 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger case errors.Is(err, jetstream.ErrStreamNotFound): log.Info("Stream of consumer does not exist. Unable to delete.") case err != nil: - return fmt.Errorf("delete jetstream consumer: %w", err) + if storedState == nil { + log.Info("Consumer not reconciled and no state received from server. Removing finalizer.") + } else { + return fmt.Errorf("delete jetstream consumer: %w", err) + } default: log.Info("Consumer deleted.") } @@ -155,54 +195,84 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error { // Create or Update the stream based on the spec - if r.ReadOnly() { - log.Info("Skipping consumer creation or update.", - "read-only", r.ReadOnly(), - ) - return nil - } - // Map spec to consumer target config targetConfig, err := consumerSpecToConfig(&consumer.Spec) if err != nil { return fmt.Errorf("map consumer spec to target config: %w", err) } - err = r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error { - consumerName := targetConfig.Name - if consumerName == "" { - consumerName = targetConfig.Durable + err = r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error { + storedState, err := getStoredConsumerState(consumer) + if err != nil { + log.Error(err, "Failed to fetch stored consumer state.") } - exists := false - _, err := js.Consumer(ctx, consumer.Spec.StreamName, consumerName) - if err == nil { - exists = true - } else if !errors.Is(err, jetstream.ErrConsumerNotFound) { + serverState, err := getServerConsumerState(ctx, js, consumer) + if err != nil { return err } - if !exists { - log.Info("Creating Consumer.") - _, err := js.CreateConsumer(ctx, consumer.Spec.StreamName, *targetConfig) - return err + // Check against known state. Skip Update if converged. + // Storing returned state from the server avoids have to + // check default values or call Update on already converged resources + if storedState != nil && serverState != nil && consumer.Status.ObservedGeneration == consumer.Generation { + diff := compareConfigState(storedState, serverState) + + if diff == "" { + return nil + } + + log.Info("Consumer config drifted from desired state.", "diff", diff) } - if !consumer.Spec.PreventUpdate { + if r.ReadOnly() { + log.Info("Skipping consumer creation or update.", + "read-only", r.ReadOnly(), + ) + return nil + } + + var updatedConsumer jetstream.Consumer + err = nil + + if serverState == nil { + log.Info("Creating Consumer.") + updatedConsumer, err = js.CreateConsumer(ctx, consumer.Spec.StreamName, *targetConfig) + if err != nil { + return err + } + } else if !consumer.Spec.PreventUpdate { log.Info("Updating Consumer.") - _, err := js.UpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig) - return err + updatedConsumer, err = js.UpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig) + if err != nil { + return err + } } else { log.Info("Skipping Consumer update.", "preventUpdate", consumer.Spec.PreventUpdate, ) } + if updatedConsumer != nil { + // Store known state in annotation + updatedState, err := json.Marshal(updatedConsumer.CachedInfo().Config) + if err != nil { + return err + } + + if consumer.Annotations == nil { + consumer.Annotations = map[string]string{} + } + consumer.Annotations[stateAnnotationConsumer] = string(updatedState) + + return r.Update(ctx, consumer) + } + return nil }) if err != nil { err = fmt.Errorf("create or update consumer: %w", err) - consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Errored", err.Error()) + consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, stateErrored, err.Error()) if err := r.Status().Update(ctx, consumer); err != nil { log.Error(err, "Failed to update ready condition to Errored.") } @@ -214,7 +284,7 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger consumer.Status.Conditions = updateReadyCondition( consumer.Status.Conditions, v1.ConditionTrue, - "Reconciling", + stateReady, "Consumer successfully created or updated.", ) err = r.Status().Update(ctx, consumer) @@ -225,14 +295,30 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger return nil } -func consumerConnOpts(spec api.ConsumerSpec) *connectionOptions { - return &connectionOptions{ - Account: spec.Account, - Creds: spec.Creds, - Nkey: spec.Nkey, - Servers: spec.Servers, - TLS: spec.TLS, +func getStoredConsumerState(consumer *api.Consumer) (*jetstream.ConsumerConfig, error) { + var storedState *jetstream.ConsumerConfig + if state, ok := consumer.Annotations[stateAnnotationConsumer]; ok { + err := json.Unmarshal([]byte(state), &storedState) + if err != nil { + return nil, err + } + } + + return storedState, nil +} + +// Fetch the current state of the consumer from the server. +// ErrConsumerNotFound is considered a valid response and does not return error +func getServerConsumerState(ctx context.Context, js jetstream.JetStream, consumer *api.Consumer) (*jetstream.ConsumerConfig, error) { + c, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName) + if errors.Is(err, jetstream.ErrConsumerNotFound) { + return nil, nil } + if err != nil { + return nil, err + } + + return &c.CachedInfo().Config, nil } func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) { @@ -253,15 +339,11 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er MemoryStorage: spec.MemStorage, FilterSubjects: spec.FilterSubjects, Metadata: spec.Metadata, - - // Explicitly set not (yet) mapped fields - Name: "", - InactiveThreshold: 0, } // DeliverPolicy if spec.DeliverPolicy != "" { - err := config.DeliverPolicy.UnmarshalJSON(asJsonString(spec.DeliverPolicy)) + err := config.DeliverPolicy.UnmarshalJSON(jsonString(spec.DeliverPolicy)) if err != nil { return nil, fmt.Errorf("invalid delivery policy: %w", err) } @@ -278,7 +360,7 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er // AckPolicy if spec.AckPolicy != "" { - err := config.AckPolicy.UnmarshalJSON(asJsonString(spec.AckPolicy)) + err := config.AckPolicy.UnmarshalJSON(jsonString(spec.AckPolicy)) if err != nil { return nil, fmt.Errorf("invalid ack policy: %w", err) } @@ -305,7 +387,7 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er // ReplayPolicy if spec.ReplayPolicy != "" { - err := config.ReplayPolicy.UnmarshalJSON(asJsonString(spec.ReplayPolicy)) + err := config.ReplayPolicy.UnmarshalJSON(jsonString(spec.ReplayPolicy)) if err != nil { return nil, fmt.Errorf("invalid replay policy: %w", err) } @@ -320,6 +402,14 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er config.MaxRequestExpires = d } + if spec.InactiveThreshold != "" { + d, err := time.ParseDuration(spec.InactiveThreshold) + if err != nil { + return nil, fmt.Errorf("invalid inactive threshold: %w", err) + } + config.InactiveThreshold = d + } + return config, nil } @@ -328,5 +418,8 @@ func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&api.Consumer{}). WithEventFilter(predicate.GenerationChangedPredicate{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). Complete(r) } diff --git a/internal/controller/consumer_controller_test.go b/internal/controller/consumer_controller_test.go index bf5410a1..5b72f579 100644 --- a/internal/controller/consumer_controller_test.go +++ b/internal/controller/consumer_controller_test.go @@ -61,7 +61,7 @@ var _ = Describe("Consumer Controller", func() { Durable: consumerName, } - // Tested coontroller + // Tested controller var controller *ConsumerReconciler BeforeEach(func(ctx SpecContext) { @@ -93,7 +93,8 @@ var _ = Describe("Consumer Controller", func() { By("setting up the tested controller") controller = &ConsumerReconciler{ - baseController, + Scheme: k8sClient.Scheme(), + JetStreamController: baseController, } }) @@ -170,7 +171,7 @@ var _ = Describe("Consumer Controller", func() { Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed()) Expect(consumer.Status.Conditions).To(HaveLen(1)) - assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionUnknown, "Reconciling", "Starting reconciliation", time.Now()) + assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionUnknown, stateReconciling, "Starting reconciliation", time.Now()) }) }) @@ -211,7 +212,7 @@ var _ = Describe("Consumer Controller", func() { assertReadyStateMatches( consumer.Status.Conditions[0], v1.ConditionFalse, - "Errored", + stateErrored, "stream", // Not existing stream as message time.Now(), ) @@ -229,7 +230,7 @@ var _ = Describe("Consumer Controller", func() { By("checking if the ready state was updated") Expect(consumer.Status.Conditions).To(HaveLen(1)) - assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionTrue, "Reconciling", "created or updated", time.Now()) + assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionTrue, stateReady, "created or updated", time.Now()) By("checking if the observed generation matches") Expect(consumer.Status.ObservedGeneration).To(Equal(consumer.Generation)) @@ -320,9 +321,10 @@ var _ = Describe("Consumer Controller", func() { When("read-only mode is enabled", func() { BeforeEach(func(ctx SpecContext) { By("setting read only on the controller") - readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{ReadOnly: true}) Expect(err).NotTo(HaveOccurred()) controller = &ConsumerReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: readOnly, } }) @@ -357,9 +359,10 @@ var _ = Describe("Consumer Controller", func() { When("namespace restriction is enabled", func() { BeforeEach(func(ctx SpecContext) { By("setting a namespace on the resource") - namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{Namespace: "other-namespace"}) Expect(err).NotTo(HaveOccurred()) controller = &ConsumerReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: namespaced, } }) @@ -485,6 +488,7 @@ var _ = Describe("Consumer Controller", func() { readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) Expect(err).NotTo(HaveOccurred()) controller = &ConsumerReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: readOnly, } }) @@ -510,6 +514,7 @@ var _ = Describe("Consumer Controller", func() { namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) Expect(err).NotTo(HaveOccurred()) controller = &ConsumerReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: namespaced, } }) @@ -594,13 +599,10 @@ func Test_consumerSpecToConfig(t *testing.T) { AckPolicy: "explicit", AckWait: "10ns", BackOff: []string{"1s", "5m"}, - Creds: "", DeliverGroup: "", DeliverPolicy: "new", DeliverSubject: "", Description: "test consumer", - PreventDelete: false, - PreventUpdate: false, DurableName: "test-consumer", FilterSubject: "time.us.>", FilterSubjects: []string{"time.us.east", "time.us.west"}, @@ -614,23 +616,29 @@ func Test_consumerSpecToConfig(t *testing.T) { MaxRequestMaxBytes: 1024, MaxWaiting: 5, MemStorage: true, - Nkey: "", OptStartSeq: 17, OptStartTime: dateString, RateLimitBps: 512, ReplayPolicy: "instant", Replicas: 9, SampleFreq: "25%", - Servers: nil, StreamName: "", - TLS: api.TLS{}, - Account: "", Metadata: map[string]string{ "meta": "data", }, + BaseStreamConfig: api.BaseStreamConfig{ + PreventDelete: false, + PreventUpdate: false, + ConnectionOpts: api.ConnectionOpts{ + Account: "", + Creds: "", + Nkey: "", + TLS: api.TLS{}, + Servers: nil, + }, + }, }, want: &jetstream.ConsumerConfig{ - Name: "", // Optional, not mapped Durable: "test-consumer", Description: "test consumer", DeliverPolicy: jetstream.DeliverNewPolicy, diff --git a/internal/controller/jetstream_controller.go b/internal/controller/jetstream_controller.go index 63e8f720..05fa338b 100644 --- a/internal/controller/jetstream_controller.go +++ b/internal/controller/jetstream_controller.go @@ -1,26 +1,24 @@ package controller import ( + "context" "fmt" + "math/rand/v2" + "os" + "path/filepath" "strings" "time" + "github.com/google/go-cmp/cmp" js "github.com/nats-io/nack/controllers/jetstream" api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" "github.com/nats-io/nats.go/jetstream" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) -type connectionOptions struct { - Account string `json:"account"` - Creds string `json:"creds"` - Nkey string `json:"nkey"` - Servers []string `json:"servers"` - TLS api.TLS `json:"tls"` -} - type JetStreamController interface { client.Client @@ -36,7 +34,9 @@ type JetStreamController interface { // The given opts values take precedence over the controllers base configuration. // // Returns the error of the operation or errors during client setup. - WithJetStreamClient(opts *connectionOptions, op func(js jetstream.JetStream) error) error + WithJetStreamClient(opts api.ConnectionOpts, ns string, op func(js jetstream.JetStream) error) error + + RequeueInterval() time.Duration } func NewJSController(k8sClient client.Client, natsConfig *NatsConfig, controllerConfig *Config) (JetStreamController, error) { @@ -44,6 +44,7 @@ func NewJSController(k8sClient client.Client, natsConfig *NatsConfig, controller Client: k8sClient, config: natsConfig, controllerConfig: controllerConfig, + cacheDir: controllerConfig.CacheDir, }, nil } @@ -51,6 +52,19 @@ type jsController struct { client.Client config *NatsConfig controllerConfig *Config + cacheDir string +} + +func (c *jsController) RequeueInterval() time.Duration { + // Stagger the requeue slightly + interval := c.controllerConfig.RequeueInterval + + // Allow up to a 10% variance + intervalRange := float64(interval.Nanoseconds()) * 0.1 + + randomFactor := (rand.Float64() * 2) - 1.0 + + return time.Duration(float64(interval.Nanoseconds()) + (intervalRange * randomFactor)) } func (c *jsController) ReadOnly() bool { @@ -62,10 +76,13 @@ func (c *jsController) ValidNamespace(namespace string) bool { return ns == "" || ns == namespace } -func (c *jsController) WithJetStreamClient(opts *connectionOptions, op func(js jetstream.JetStream) error) error { +func (c *jsController) WithJetStreamClient(opts api.ConnectionOpts, ns string, op func(js jetstream.JetStream) error) error { // Build single use client // TODO(future-feature): Use client-pool instead of single use client - cfg := c.buildNatsConfig(opts) + cfg, err := c.natsConfigFromOpts(opts, ns) + if err != nil { + return err + } jsClient, closer, err := CreateJetStreamClient(cfg, true) if err != nil { @@ -76,56 +93,155 @@ func (c *jsController) WithJetStreamClient(opts *connectionOptions, op func(js j return op(jsClient) } -// buildNatsConfig uses given opts to override the base NatsConfig. -func (c *jsController) buildNatsConfig(opts *connectionOptions) *NatsConfig { - serverUrls := strings.Join(opts.Servers, ",") +// Setup default options, override from account resource and CRD options if configured +func (c *jsController) natsConfigFromOpts(opts api.ConnectionOpts, ns string) (*NatsConfig, error) { + ctx, done := context.WithTimeout(context.Background(), 5*time.Second) + defer done() - // Takes opts values if present - cfg := &NatsConfig{ - CRDConnect: false, - ClientName: c.config.ClientName, - ServerURL: or(serverUrls, c.config.ServerURL), - TLSFirst: c.config.TLSFirst, // TODO(future-feature): expose TLSFirst in the spec config + natsConfig := &NatsConfig{ + ClientName: c.config.ClientName, + Credentials: c.config.Credentials, + NKey: c.config.NKey, + ServerURL: c.config.ServerURL, + CAs: c.config.CAs, + TLSFirst: c.config.TLSFirst, } - // Note: The opts.Account value coming from the resource spec is currently not considered. - // creds/nkey are associated with an account, the account field might be redundant. - // See https://github.com/nats-io/nack/pull/211#pullrequestreview-2511111670 + if c.config.Certificate != "" && c.config.Key != "" { + natsConfig.Certificate = c.config.Certificate + natsConfig.Key = c.config.Key + } - // Authentication either from opts or base config - if opts.Creds != "" || opts.Nkey != "" { - cfg.Credentials = opts.Creds - cfg.NKey = opts.Nkey - } else { - cfg.Credentials = c.config.Credentials - cfg.NKey = c.config.NKey + if opts.Account == "" { + return applyConnOpts(*natsConfig, opts), nil } - // CAs from opts or base config - if len(opts.TLS.RootCAs) > 0 { - cfg.CAs = opts.TLS.RootCAs - } else { - cfg.CAs = c.config.CAs + // Apply Account options first, over global. + // Apply remaining CRD options last + + account := &api.Account{} + err := c.Get(ctx, + types.NamespacedName{ + Name: opts.Account, + Namespace: ns, + }, + account, + ) + if err != nil { + return nil, err } - // Client Cert and Key either from opts or base config - if opts.TLS.ClientCert != "" && opts.TLS.ClientKey != "" { - cfg.Certificate = opts.TLS.ClientCert - cfg.Key = opts.TLS.ClientKey - } else { - cfg.Certificate = c.config.Certificate - cfg.Key = c.config.Key + if len(account.Spec.Servers) > 0 { + natsConfig.ServerURL = strings.Join(account.Spec.Servers, ",") } - return cfg + if account.Spec.TLS != nil && account.Spec.TLS.Secret != nil { + tlsSecret := &v1.Secret{} + err := c.Get(ctx, + types.NamespacedName{ + Name: account.Spec.TLS.Secret.Name, + Namespace: ns, + }, + tlsSecret, + ) + if err != nil { + return nil, err + } + + accDir := filepath.Join(c.cacheDir, c.controllerConfig.Namespace, opts.Account) + if err := os.MkdirAll(accDir, 0o755); err != nil { + return nil, err + } + + for k, v := range tlsSecret.Data { + filePath := "" + switch k { + case account.Spec.TLS.ClientCert: + filePath = filepath.Join(accDir, account.Spec.TLS.ClientCert) + natsConfig.Certificate = filePath + case account.Spec.TLS.ClientKey: + filePath = filepath.Join(accDir, account.Spec.TLS.ClientKey) + natsConfig.Key = filePath + case account.Spec.TLS.RootCAs: + filePath = filepath.Join(accDir, account.Spec.TLS.RootCAs) + natsConfig.CAs = append(natsConfig.CAs, filePath) + default: + return nil, fmt.Errorf("key in TLS secret does not match any of the expected values") + } + if err := os.WriteFile(filePath, v, 0o600); err != nil { + return nil, err + } + } + } else if account.Spec.TLS != nil { + natsConfig.Certificate = account.Spec.TLS.ClientCert + natsConfig.Key = account.Spec.TLS.ClientKey + natsConfig.CAs = []string{account.Spec.TLS.RootCAs} + } + + if account.Spec.Creds != nil && account.Spec.Creds.Secret != nil { + credsSecret := &v1.Secret{} + err := c.Get(ctx, + types.NamespacedName{ + Name: account.Spec.Creds.Secret.Name, + Namespace: ns, + }, + credsSecret, + ) + if err != nil { + return nil, err + } + + accDir := filepath.Join(c.cacheDir, c.controllerConfig.Namespace, opts.Account) + if err := os.MkdirAll(accDir, 0o755); err != nil { + return nil, err + } + + for k, v := range credsSecret.Data { + filePath := "" + switch k { + case account.Spec.Creds.File: + filePath = filepath.Join(accDir, account.Spec.Creds.File) + natsConfig.Credentials = filePath + default: + return nil, fmt.Errorf("key in Creds secret does not match any of the expected values") + } + if err := os.WriteFile(filePath, v, 0o600); err != nil { + return nil, err + } + } + } else if account.Spec.Creds.File != "" { + natsConfig.Credentials = account.Spec.Creds.File + } + + return applyConnOpts(*natsConfig, opts), nil } -// or returns the value if it is not the null value. Otherwise, the fallback value is returned -func or[T comparable](v T, fallback T) T { - if v == *new(T) { - return fallback +func applyConnOpts(baseConfig NatsConfig, opts api.ConnectionOpts) *NatsConfig { + natsConfig := baseConfig + if len(opts.Servers) > 0 { + natsConfig.ServerURL = strings.Join(opts.Servers, ",") + } + + // Currently, if the global TLSFirst is set, a false value in the CRD will not override + // due to that being the bool zero value. A true value in the CRD can override a global false. + if opts.TLSFirst { + natsConfig.TLSFirst = opts.TLSFirst } - return v + + if opts.Creds != "" { + natsConfig.Credentials = opts.Creds + } + + if len(opts.TLS.RootCAs) > 0 { + natsConfig.CAs = opts.TLS.RootCAs + } + + if opts.TLS.ClientCert != "" && opts.TLS.ClientKey != "" { + natsConfig.Certificate = opts.TLS.ClientCert + natsConfig.Key = opts.TLS.ClientKey + } + + return &natsConfig } // updateReadyCondition returns the given conditions with an added or updated ready condition. @@ -159,8 +275,12 @@ func updateReadyCondition(conditions []api.Condition, status v1.ConditionStatus, } } -// asJsonString returns the given string wrapped in " and converted to []byte. +// jsonString returns the given string wrapped in " and converted to []byte. // Helper for mapping spec config to jetStream config using UnmarshalJSON. -func asJsonString(v string) []byte { +func jsonString(v string) []byte { return []byte("\"" + v + "\"") } + +func compareConfigState(actual any, desired any) string { + return cmp.Diff(actual, desired) +} diff --git a/internal/controller/keyvalue_controller.go b/internal/controller/keyvalue_controller.go index e174ee0c..11a4261f 100644 --- a/internal/controller/keyvalue_controller.go +++ b/internal/controller/keyvalue_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "encoding/json" "errors" "fmt" "time" @@ -27,14 +28,21 @@ import ( "github.com/nats-io/nats.go/jetstream" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) +const ( + kvStreamPrefix = "KV_" +) + // KeyValueReconciler reconciles a KeyValue object type KeyValueReconciler struct { + Scheme *runtime.Scheme JetStreamController } @@ -61,7 +69,7 @@ func (r *KeyValueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c keyValue := &api.KeyValue{} if err := r.Get(ctx, req.NamespacedName, keyValue); err != nil { if apierrors.IsNotFound(err) { - log.Info("KeyValue resource not found. Ignoring since object must be deleted.") + log.Info("KeyValue deleted.", "keyValueName", req.NamespacedName.String()) return ctrl.Result{}, nil } return ctrl.Result{}, fmt.Errorf("get keyvalue resource '%s': %w", req.NamespacedName.String(), err) @@ -72,7 +80,7 @@ func (r *KeyValueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // Update ready status to unknown when no status is set if len(keyValue.Status.Conditions) == 0 { log.Info("Setting initial ready condition to unknown.") - keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation") + keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionUnknown, stateReconciling, "Starting reconciliation") err := r.Status().Update(ctx, keyValue) if err != nil { return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err) @@ -80,19 +88,6 @@ func (r *KeyValueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{Requeue: true}, nil } - // Add finalizer - if !controllerutil.ContainsFinalizer(keyValue, keyValueFinalizer) { - log.Info("Adding KeyValue finalizer.") - if ok := controllerutil.AddFinalizer(keyValue, keyValueFinalizer); !ok { - return ctrl.Result{}, errors.New("failed to add finalizer to keyvalue resource") - } - - if err := r.Update(ctx, keyValue); err != nil { - return ctrl.Result{}, fmt.Errorf("update keyvalue resource to add finalizer: %w", err) - } - return ctrl.Result{}, nil - } - // Check Deletion markedForDeletion := keyValue.GetDeletionTimestamp() != nil if markedForDeletion { @@ -108,27 +103,56 @@ func (r *KeyValueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, nil } + // Add finalizer + if !controllerutil.ContainsFinalizer(keyValue, keyValueFinalizer) { + log.Info("Adding KeyValue finalizer.") + if ok := controllerutil.AddFinalizer(keyValue, keyValueFinalizer); !ok { + return ctrl.Result{}, errors.New("failed to add finalizer to keyvalue resource") + } + + if err := r.Update(ctx, keyValue); err != nil { + return ctrl.Result{}, fmt.Errorf("update keyvalue resource to add finalizer: %w", err) + } + return ctrl.Result{}, nil + } + // Create or update KeyValue if err := r.createOrUpdate(ctx, log, keyValue); err != nil { return ctrl.Result{}, fmt.Errorf("create or update: %s", err) } - return ctrl.Result{}, nil + + return ctrl.Result{RequeueAfter: r.RequeueInterval()}, nil } func (r *KeyValueReconciler) deleteKeyValue(ctx context.Context, log logr.Logger, keyValue *api.KeyValue) error { // Set status to false - keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.") + keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionFalse, stateFinalizing, "Performing finalizer operations.") if err := r.Status().Update(ctx, keyValue); err != nil { return fmt.Errorf("update ready condition: %w", err) } + storedState, err := getStoredKeyValueState(keyValue) + if err != nil { + log.Error(err, "Failed to fetch stored state.") + } + if !keyValue.Spec.PreventDelete && !r.ReadOnly() { log.Info("Deleting KeyValue.") - err := r.WithJetStreamClient(keyValueConnOpts(keyValue.Spec), func(js jetstream.JetStream) error { + err := r.WithJetStreamClient(keyValue.Spec.ConnectionOpts, keyValue.Namespace, func(js jetstream.JetStream) error { + _, err := getServerKeyValueState(ctx, js, keyValue) + // If we have no known state for this KeyValue it has never been reconciled. + // If we are also receiving an error fetching state, either the KeyValue does not exist + // or this resource config is invalid. + if err != nil && storedState == nil { + return nil + } + return js.DeleteKeyValue(ctx, keyValue.Spec.Bucket) }) if errors.Is(err, jetstream.ErrBucketNotFound) { log.Info("KeyValue does not exist, unable to delete.", "keyValueName", keyValue.Spec.Bucket) + } else if err != nil && storedState == nil { + log.Info("KeyValue not reconciled and no state received from server. Removing finalizer.") } else if err != nil { return fmt.Errorf("delete keyvalue during finalization: %w", err) } @@ -152,13 +176,6 @@ func (r *KeyValueReconciler) deleteKeyValue(ctx context.Context, log logr.Logger func (r *KeyValueReconciler) createOrUpdate(ctx context.Context, log logr.Logger, keyValue *api.KeyValue) error { // Create or Update the KeyValue based on the spec - if r.ReadOnly() { - log.Info("Skipping KeyValue creation or update.", - "read-only", r.ReadOnly(), - ) - return nil - } - // Map spec to KeyValue targetConfig targetConfig, err := keyValueSpecToConfig(&keyValue.Spec) if err != nil { @@ -166,37 +183,83 @@ func (r *KeyValueReconciler) createOrUpdate(ctx context.Context, log logr.Logger } // UpdateKeyValue is called on every reconciliation when the stream is not to be deleted. - // TODO(future-feature): Do we need to check if config differs? - err = r.WithJetStreamClient(keyValueConnOpts(keyValue.Spec), func(js jetstream.JetStream) error { - exists := false - _, err := js.KeyValue(ctx, targetConfig.Bucket) - if err == nil { - exists = true - } else if !errors.Is(err, jetstream.ErrBucketNotFound) { - return err + err = r.WithJetStreamClient(keyValue.Spec.ConnectionOpts, keyValue.Namespace, func(js jetstream.JetStream) error { + storedState, err := getStoredKeyValueState(keyValue) + if err != nil { + log.Error(err, "Failed to fetch stored KeyValue state") } - if !exists { - log.Info("Creating KeyValue.") - _, err = js.CreateKeyValue(ctx, targetConfig) + serverState, err := getServerKeyValueState(ctx, js, keyValue) + if err != nil { return err } - if !keyValue.Spec.PreventUpdate { + // Check against known state. Skip Update if converged. + // Storing returned state from the server avoids have to + // check default values or call Update on already converged resources + if storedState != nil && serverState != nil && keyValue.Status.ObservedGeneration == keyValue.Generation { + diff := compareConfigState(storedState, serverState) + + if diff == "" { + return nil + } + + log.Info("KeyValue config drifted from desired state.", "diff", diff) + } + + if r.ReadOnly() { + log.Info("Skipping KeyValue creation or update.", + "read-only", r.ReadOnly(), + ) + return nil + } + + var updatedKeyValue jetstream.KeyValue + err = nil + + if serverState == nil { + log.Info("Creating KeyValue.") + updatedKeyValue, err = js.CreateKeyValue(ctx, targetConfig) + if err != nil { + return err + } + } else if !keyValue.Spec.PreventUpdate { log.Info("Updating KeyValue.") - _, err = js.UpdateKeyValue(ctx, targetConfig) - return err + updatedKeyValue, err = js.UpdateKeyValue(ctx, targetConfig) + if err != nil { + return err + } } else { log.Info("Skipping KeyValue update.", "preventUpdate", keyValue.Spec.PreventUpdate, ) } + if updatedKeyValue != nil { + // Store known state in annotation + serverState, err = getServerKeyValueState(ctx, js, keyValue) + if err != nil { + return err + } + + updatedState, err := json.Marshal(serverState) + if err != nil { + return err + } + + if keyValue.Annotations == nil { + keyValue.Annotations = map[string]string{} + } + keyValue.Annotations[stateAnnotationKV] = string(updatedState) + + return r.Update(ctx, keyValue) + } + return nil }) if err != nil { err = fmt.Errorf("create or update keyvalue: %w", err) - keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionFalse, "Errored", err.Error()) + keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionFalse, stateErrored, err.Error()) if err := r.Status().Update(ctx, keyValue); err != nil { log.Error(err, "Failed to update ready condition to Errored.") } @@ -208,7 +271,7 @@ func (r *KeyValueReconciler) createOrUpdate(ctx context.Context, log logr.Logger keyValue.Status.Conditions = updateReadyCondition( keyValue.Status.Conditions, v1.ConditionTrue, - "Reconciling", + stateReady, "KeyValue successfully created or updated.", ) err = r.Status().Update(ctx, keyValue) @@ -219,15 +282,30 @@ func (r *KeyValueReconciler) createOrUpdate(ctx context.Context, log logr.Logger return nil } -// keyValueConnOpts extracts nats connection relevant fields from the given KeyValue spec as connectionOptions. -func keyValueConnOpts(spec api.KeyValueSpec) *connectionOptions { - return &connectionOptions{ - Account: spec.Account, - Creds: spec.Creds, - Nkey: spec.Nkey, - Servers: spec.Servers, - TLS: spec.TLS, +func getStoredKeyValueState(keyValue *api.KeyValue) (*jetstream.StreamConfig, error) { + var storedState *jetstream.StreamConfig + if state, ok := keyValue.Annotations[stateAnnotationKV]; ok { + err := json.Unmarshal([]byte(state), &storedState) + if err != nil { + return nil, err + } } + + return storedState, nil +} + +// Fetch the current state of the KeyValue stream from the server. +// ErrStreamNotFound is considered a valid response and does not return error +func getServerKeyValueState(ctx context.Context, js jetstream.JetStream, keyValue *api.KeyValue) (*jetstream.StreamConfig, error) { + s, err := js.Stream(ctx, fmt.Sprintf("%s%s", kvStreamPrefix, keyValue.Spec.Bucket)) + if errors.Is(err, jetstream.ErrStreamNotFound) { + return nil, nil + } + if err != nil { + return nil, err + } + + return &s.CachedInfo().Config, nil } // keyValueSpecToConfig creates a jetstream.KeyValueConfig matching the given KeyValue resource spec @@ -254,7 +332,7 @@ func keyValueSpecToConfig(spec *api.KeyValueSpec) (jetstream.KeyValueConfig, err // storage if spec.Storage != "" { - err := config.Storage.UnmarshalJSON(asJsonString(spec.Storage)) + err := config.Storage.UnmarshalJSON(jsonString(spec.Storage)) if err != nil { return jetstream.KeyValueConfig{}, fmt.Errorf("invalid storage: %w", err) } @@ -305,8 +383,9 @@ func keyValueSpecToConfig(spec *api.KeyValueSpec) (jetstream.KeyValueConfig, err func (r *KeyValueReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&api.KeyValue{}). - Owns(&api.KeyValue{}). - // Only trigger on generation changes WithEventFilter(predicate.GenerationChangedPredicate{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). Complete(r) } diff --git a/internal/controller/keyvalue_controller_test.go b/internal/controller/keyvalue_controller_test.go index 0333f9f1..82cac273 100644 --- a/internal/controller/keyvalue_controller_test.go +++ b/internal/controller/keyvalue_controller_test.go @@ -86,7 +86,8 @@ var _ = Describe("KeyValue Controller", func() { By("setting up the tested controller") controller = &KeyValueReconciler{ - baseController, + Scheme: k8sClient.Scheme(), + JetStreamController: baseController, } }) @@ -154,7 +155,7 @@ var _ = Describe("KeyValue Controller", func() { Expect(k8sClient.Get(ctx, typeNamespacedName, keyValue)).To(Succeed()) Expect(keyValue.Status.Conditions).To(HaveLen(1)) - assertReadyStateMatches(keyValue.Status.Conditions[0], v1.ConditionUnknown, "Reconciling", "Starting reconciliation", time.Now()) + assertReadyStateMatches(keyValue.Status.Conditions[0], v1.ConditionUnknown, stateReconciling, "Starting reconciliation", time.Now()) }) }) @@ -188,7 +189,7 @@ var _ = Describe("KeyValue Controller", func() { By("checking if the ready state was updated") Expect(keyValue.Status.Conditions).To(HaveLen(1)) - assertReadyStateMatches(keyValue.Status.Conditions[0], v1.ConditionTrue, "Reconciling", "created or updated", time.Now()) + assertReadyStateMatches(keyValue.Status.Conditions[0], v1.ConditionTrue, stateReady, "created or updated", time.Now()) By("checking if the observed generation matches") Expect(keyValue.Status.ObservedGeneration).To(Equal(keyValue.Generation)) @@ -243,9 +244,10 @@ var _ = Describe("KeyValue Controller", func() { When("read-only mode is enabled", func() { BeforeEach(func(ctx SpecContext) { By("setting read only on the controller") - readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{ReadOnly: true}) Expect(err).NotTo(HaveOccurred()) controller = &KeyValueReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: readOnly, } }) @@ -283,9 +285,10 @@ var _ = Describe("KeyValue Controller", func() { When("namespace restriction is enabled", func() { BeforeEach(func(ctx SpecContext) { By("setting a namespace on the resource") - namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{Namespace: "other-namespace"}) Expect(err).NotTo(HaveOccurred()) controller = &KeyValueReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: namespaced, } }) @@ -368,12 +371,13 @@ var _ = Describe("KeyValue Controller", func() { // Setup client for not running server // Use actual test server to ensure port not used by other service on test instance sv := CreateTestServer() - base, err := NewJSController(k8sClient, &NatsConfig{ServerURL: sv.ClientURL()}, &Config{}) + disconnectedController, err := NewJSController(k8sClient, &NatsConfig{ServerURL: sv.ClientURL()}, &Config{}) Expect(err).NotTo(HaveOccurred()) sv.Shutdown() controller := &KeyValueReconciler{ - base, + Scheme: k8sClient.Scheme(), + JetStreamController: disconnectedController, } By("reconciling resource") @@ -392,7 +396,7 @@ var _ = Describe("KeyValue Controller", func() { assertReadyStateMatches( keyValue.Status.Conditions[0], v1.ConditionFalse, - "Errored", + stateErrored, "create or update keyvalue:", time.Now(), ) @@ -476,9 +480,10 @@ var _ = Describe("KeyValue Controller", func() { When("read only is set", func() { BeforeEach(func(ctx SpecContext) { By("setting read only on the controller") - readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{ReadOnly: true}) Expect(err).NotTo(HaveOccurred()) controller = &KeyValueReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: readOnly, } }) @@ -501,9 +506,10 @@ var _ = Describe("KeyValue Controller", func() { When("controller is restricted to different namespace", func() { BeforeEach(func(ctx SpecContext) { - namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{Namespace: "other-namespace"}) Expect(err).NotTo(HaveOccurred()) controller = &KeyValueReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: namespaced, } }) @@ -623,13 +629,15 @@ func Test_mapKVSpecToConfig(t *testing.T) { }}, Storage: "memory", BaseStreamConfig: api.BaseStreamConfig{ - Account: "", - Creds: "", PreventDelete: false, PreventUpdate: false, - Nkey: "", - Servers: nil, - TLS: api.TLS{}, + ConnectionOpts: api.ConnectionOpts{ + Account: "", + Creds: "", + Nkey: "", + Servers: nil, + TLS: api.TLS{}, + }, }, }, want: jetstream.KeyValueConfig{ diff --git a/internal/controller/objectstore_controller.go b/internal/controller/objectstore_controller.go index 1d32f6f6..4f88fb69 100644 --- a/internal/controller/objectstore_controller.go +++ b/internal/controller/objectstore_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "encoding/json" "errors" "fmt" "time" @@ -27,14 +28,22 @@ import ( "github.com/nats-io/nats.go/jetstream" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) +const ( + objStreamPrefix = "OBJ_" +) + // ObjectStoreReconciler reconciles a ObjectStore object type ObjectStoreReconciler struct { + Scheme *runtime.Scheme + JetStreamController } @@ -61,7 +70,7 @@ func (r *ObjectStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) objectStore := &api.ObjectStore{} if err := r.Get(ctx, req.NamespacedName, objectStore); err != nil { if apierrors.IsNotFound(err) { - log.Info("ObjectStore resource not found. Ignoring since object must be deleted.") + log.Info("ObjectStore deleted.", "objectStoreName", req.NamespacedName.String()) return ctrl.Result{}, nil } return ctrl.Result{}, fmt.Errorf("get objectstore resource '%s': %w", req.NamespacedName.String(), err) @@ -72,7 +81,7 @@ func (r *ObjectStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Update ready status to unknown when no status is set if len(objectStore.Status.Conditions) == 0 { log.Info("Setting initial ready condition to unknown.") - objectStore.Status.Conditions = updateReadyCondition(objectStore.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation") + objectStore.Status.Conditions = updateReadyCondition(objectStore.Status.Conditions, v1.ConditionUnknown, stateReconciling, "Starting reconciliation") err := r.Status().Update(ctx, objectStore) if err != nil { return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err) @@ -80,19 +89,6 @@ func (r *ObjectStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{Requeue: true}, nil } - // Add finalizer - if !controllerutil.ContainsFinalizer(objectStore, objectStoreFinalizer) { - log.Info("Adding ObjectStore finalizer.") - if ok := controllerutil.AddFinalizer(objectStore, objectStoreFinalizer); !ok { - return ctrl.Result{}, errors.New("failed to add finalizer to objectstore resource") - } - - if err := r.Update(ctx, objectStore); err != nil { - return ctrl.Result{}, fmt.Errorf("update objectstore resource to add finalizer: %w", err) - } - return ctrl.Result{}, nil - } - // Check Deletion markedForDeletion := objectStore.GetDeletionTimestamp() != nil if markedForDeletion { @@ -108,28 +104,56 @@ func (r *ObjectStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } + // Add finalizer + if !controllerutil.ContainsFinalizer(objectStore, objectStoreFinalizer) { + log.Info("Adding ObjectStore finalizer.") + if ok := controllerutil.AddFinalizer(objectStore, objectStoreFinalizer); !ok { + return ctrl.Result{}, errors.New("failed to add finalizer to objectstore resource") + } + + if err := r.Update(ctx, objectStore); err != nil { + return ctrl.Result{}, fmt.Errorf("update objectstore resource to add finalizer: %w", err) + } + return ctrl.Result{}, nil + } + // Create or update ObjectStore if err := r.createOrUpdate(ctx, log, objectStore); err != nil { return ctrl.Result{}, fmt.Errorf("create or update: %s", err) } - return ctrl.Result{}, nil + + return ctrl.Result{RequeueAfter: r.RequeueInterval()}, nil } func (r *ObjectStoreReconciler) deleteObjectStore(ctx context.Context, log logr.Logger, objectStore *api.ObjectStore) error { // Set status to false - objectStore.Status.Conditions = updateReadyCondition(objectStore.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.") + objectStore.Status.Conditions = updateReadyCondition(objectStore.Status.Conditions, v1.ConditionFalse, stateFinalizing, "Performing finalizer operations.") if err := r.Status().Update(ctx, objectStore); err != nil { return fmt.Errorf("update ready condition: %w", err) } + storedState, err := getStoredObjectStoreState(objectStore) + if err != nil { + log.Error(err, "Failed to fetch stored state.") + } + if !objectStore.Spec.PreventDelete && !r.ReadOnly() { log.Info("Deleting ObjectStore.") - err := r.WithJetStreamClient(objectStoreConnOpts(objectStore.Spec), func(js jetstream.JetStream) error { + err := r.WithJetStreamClient(objectStore.Spec.ConnectionOpts, objectStore.Namespace, func(js jetstream.JetStream) error { + _, err := getServerObjectStoreState(ctx, js, objectStore) + // If we have no known state for this object store it has never been reconciled. + // If we are also receiving an error fetching state, either the object store does not exist + // or this resource config is invalid. + if err != nil && storedState == nil { + return nil + } + return js.DeleteObjectStore(ctx, objectStore.Spec.Bucket) }) - // FIX: ErrStreamNotFound -> ErrBucketNotFound once nats.go is corrected - if errors.Is(err, jetstream.ErrStreamNotFound) { + if errors.Is(err, jetstream.ErrStreamNotFound) || errors.Is(err, jetstream.ErrBucketNotFound) { log.Info("ObjectStore does not exist, unable to delete.", "objectStoreName", objectStore.Spec.Bucket) + } else if err != nil && storedState == nil { + log.Info("ObjectStore not reconciled and no state received from server. Removing finalizer.") } else if err != nil { return fmt.Errorf("delete objectstore during finalization: %w", err) } @@ -153,13 +177,6 @@ func (r *ObjectStoreReconciler) deleteObjectStore(ctx context.Context, log logr. func (r *ObjectStoreReconciler) createOrUpdate(ctx context.Context, log logr.Logger, objectStore *api.ObjectStore) error { // Create or Update the ObjectStore based on the spec - if r.ReadOnly() { - log.Info("Skipping ObjectStore creation or update.", - "read-only", r.ReadOnly(), - ) - return nil - } - // Map spec to ObjectStore targetConfig targetConfig, err := objectStoreSpecToConfig(&objectStore.Spec) if err != nil { @@ -167,37 +184,83 @@ func (r *ObjectStoreReconciler) createOrUpdate(ctx context.Context, log logr.Log } // UpdateObjectStore is called on every reconciliation when the stream is not to be deleted. - // TODO(future-feature): Do we need to check if config differs? - err = r.WithJetStreamClient(objectStoreConnOpts(objectStore.Spec), func(js jetstream.JetStream) error { - exists := false - _, err := js.ObjectStore(ctx, targetConfig.Bucket) - if err == nil { - exists = true - } else if !errors.Is(err, jetstream.ErrBucketNotFound) { - return err + err = r.WithJetStreamClient(objectStore.Spec.ConnectionOpts, objectStore.Namespace, func(js jetstream.JetStream) error { + storedState, err := getStoredObjectStoreState(objectStore) + if err != nil { + log.Error(err, "Failed to fetch stored objectstore state") } - if !exists { - log.Info("Creating ObjectStore.") - _, err = js.CreateObjectStore(ctx, targetConfig) + serverState, err := getServerObjectStoreState(ctx, js, objectStore) + if err != nil { return err } - if !objectStore.Spec.PreventUpdate { + // Check against known state. Skip Update if converged. + // Storing returned state from the server avoids have to + // check default values or call Update on already converged resources + if storedState != nil && serverState != nil && objectStore.Status.ObservedGeneration == objectStore.Generation { + diff := compareConfigState(storedState, serverState) + + if diff == "" { + return nil + } + + log.Info("Object Store config drifted from desired state.", "diff", diff) + } + + if r.ReadOnly() { + log.Info("Skipping ObjectStore creation or update.", + "read-only", r.ReadOnly(), + ) + return nil + } + + var updatedObjectStore jetstream.ObjectStore + err = nil + + if serverState == nil { + log.Info("Creating ObjectStore.") + updatedObjectStore, err = js.CreateObjectStore(ctx, targetConfig) + if err != nil { + return err + } + } else if !objectStore.Spec.PreventUpdate { log.Info("Updating ObjectStore.") - _, err = js.UpdateObjectStore(ctx, targetConfig) - return err + updatedObjectStore, err = js.UpdateObjectStore(ctx, targetConfig) + if err != nil { + return err + } } else { log.Info("Skipping ObjectStore update.", "preventUpdate", objectStore.Spec.PreventUpdate, ) } + if updatedObjectStore != nil { + // Store known state in annotation + serverState, err = getServerObjectStoreState(ctx, js, objectStore) + if err != nil { + return err + } + + updatedState, err := json.Marshal(serverState) + if err != nil { + return err + } + + if objectStore.Annotations == nil { + objectStore.Annotations = map[string]string{} + } + objectStore.Annotations[stateAnnotationObj] = string(updatedState) + + return r.Update(ctx, objectStore) + } + return nil }) if err != nil { err = fmt.Errorf("create or update objectstore: %w", err) - objectStore.Status.Conditions = updateReadyCondition(objectStore.Status.Conditions, v1.ConditionFalse, "Errored", err.Error()) + objectStore.Status.Conditions = updateReadyCondition(objectStore.Status.Conditions, v1.ConditionFalse, stateErrored, err.Error()) if err := r.Status().Update(ctx, objectStore); err != nil { log.Error(err, "Failed to update ready condition to Errored.") } @@ -209,7 +272,7 @@ func (r *ObjectStoreReconciler) createOrUpdate(ctx context.Context, log logr.Log objectStore.Status.Conditions = updateReadyCondition( objectStore.Status.Conditions, v1.ConditionTrue, - "Reconciling", + stateReady, "ObjectStore successfully created or updated.", ) err = r.Status().Update(ctx, objectStore) @@ -220,15 +283,30 @@ func (r *ObjectStoreReconciler) createOrUpdate(ctx context.Context, log logr.Log return nil } -// objectStoreConnOpts extracts nats connection relevant fields from the given ObjectStore spec as connectionOptions. -func objectStoreConnOpts(spec api.ObjectStoreSpec) *connectionOptions { - return &connectionOptions{ - Account: spec.Account, - Creds: spec.Creds, - Nkey: spec.Nkey, - Servers: spec.Servers, - TLS: spec.TLS, +func getStoredObjectStoreState(objectStore *api.ObjectStore) (*jetstream.StreamConfig, error) { + var storedState *jetstream.StreamConfig + if state, ok := objectStore.Annotations[stateAnnotationObj]; ok { + err := json.Unmarshal([]byte(state), &storedState) + if err != nil { + return nil, err + } } + + return storedState, nil +} + +// Fetch the current state of the ObjectStore stream from the server. +// ErrStreamNotFound is considered a valid response and does not return error +func getServerObjectStoreState(ctx context.Context, js jetstream.JetStream, objectStore *api.ObjectStore) (*jetstream.StreamConfig, error) { + s, err := js.Stream(ctx, fmt.Sprintf("%s%s", objStreamPrefix, objectStore.Spec.Bucket)) + if errors.Is(err, jetstream.ErrStreamNotFound) { + return nil, nil + } + if err != nil { + return nil, err + } + + return &s.CachedInfo().Config, nil } // objectStoreSpecToConfig creates a jetstream.ObjectStoreConfig matching the given ObjectStore resource spec @@ -254,7 +332,7 @@ func objectStoreSpecToConfig(spec *api.ObjectStoreSpec) (jetstream.ObjectStoreCo // storage if spec.Storage != "" { - err := config.Storage.UnmarshalJSON(asJsonString(spec.Storage)) + err := config.Storage.UnmarshalJSON(jsonString(spec.Storage)) if err != nil { return jetstream.ObjectStoreConfig{}, fmt.Errorf("invalid storage: %w", err) } @@ -275,8 +353,9 @@ func objectStoreSpecToConfig(spec *api.ObjectStoreSpec) (jetstream.ObjectStoreCo func (r *ObjectStoreReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&api.ObjectStore{}). - Owns(&api.ObjectStore{}). - // Only trigger on generation changes WithEventFilter(predicate.GenerationChangedPredicate{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). Complete(r) } diff --git a/internal/controller/objectstore_controller_test.go b/internal/controller/objectstore_controller_test.go index a2ecabf1..e3a0d999 100644 --- a/internal/controller/objectstore_controller_test.go +++ b/internal/controller/objectstore_controller_test.go @@ -85,7 +85,8 @@ var _ = Describe("ObjectStore Controller", func() { By("setting up the tested controller") controller = &ObjectStoreReconciler{ - baseController, + Scheme: k8sClient.Scheme(), + JetStreamController: baseController, } }) @@ -152,7 +153,7 @@ var _ = Describe("ObjectStore Controller", func() { Expect(k8sClient.Get(ctx, typeNamespacedName, objectStore)).To(Succeed()) Expect(objectStore.Status.Conditions).To(HaveLen(1)) - assertReadyStateMatches(objectStore.Status.Conditions[0], v1.ConditionUnknown, "Reconciling", "Starting reconciliation", time.Now()) + assertReadyStateMatches(objectStore.Status.Conditions[0], v1.ConditionUnknown, stateReconciling, "Starting reconciliation", time.Now()) }) }) @@ -186,7 +187,7 @@ var _ = Describe("ObjectStore Controller", func() { By("checking if the ready state was updated") Expect(objectStore.Status.Conditions).To(HaveLen(1)) - assertReadyStateMatches(objectStore.Status.Conditions[0], v1.ConditionTrue, "Reconciling", "created or updated", time.Now()) + assertReadyStateMatches(objectStore.Status.Conditions[0], v1.ConditionTrue, stateReady, "created or updated", time.Now()) By("checking if the observed generation matches") Expect(objectStore.Status.ObservedGeneration).To(Equal(objectStore.Generation)) @@ -239,9 +240,10 @@ var _ = Describe("ObjectStore Controller", func() { When("read-only mode is enabled", func() { BeforeEach(func(ctx SpecContext) { By("setting read only on the controller") - readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{ReadOnly: true}) Expect(err).NotTo(HaveOccurred()) controller = &ObjectStoreReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: readOnly, } }) @@ -278,9 +280,10 @@ var _ = Describe("ObjectStore Controller", func() { When("namespace restriction is enabled", func() { BeforeEach(func(ctx SpecContext) { By("setting a namespace on the resource") - namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{Namespace: "other-namespace"}) Expect(err).NotTo(HaveOccurred()) controller = &ObjectStoreReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: namespaced, } }) @@ -360,12 +363,13 @@ var _ = Describe("ObjectStore Controller", func() { // Setup client for not running server // Use actual test server to ensure port not used by other service on test instance sv := CreateTestServer() - base, err := NewJSController(k8sClient, &NatsConfig{ServerURL: sv.ClientURL()}, &Config{}) + disconnectedController, err := NewJSController(k8sClient, &NatsConfig{ServerURL: sv.ClientURL()}, &Config{}) Expect(err).NotTo(HaveOccurred()) sv.Shutdown() controller := &ObjectStoreReconciler{ - base, + Scheme: k8sClient.Scheme(), + JetStreamController: disconnectedController, } By("reconciling resource") @@ -384,7 +388,7 @@ var _ = Describe("ObjectStore Controller", func() { assertReadyStateMatches( objectStore.Status.Conditions[0], v1.ConditionFalse, - "Errored", + stateErrored, "create or update objectstore:", time.Now(), ) @@ -468,9 +472,10 @@ var _ = Describe("ObjectStore Controller", func() { When("read only is set", func() { BeforeEach(func(ctx SpecContext) { By("setting read only on the controller") - readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{ReadOnly: true}) Expect(err).NotTo(HaveOccurred()) controller = &ObjectStoreReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: readOnly, } }) @@ -493,9 +498,10 @@ var _ = Describe("ObjectStore Controller", func() { When("controller is restricted to different namespace", func() { BeforeEach(func(ctx SpecContext) { - namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{Namespace: "other-namespace"}) Expect(err).NotTo(HaveOccurred()) controller = &ObjectStoreReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: namespaced, } }) @@ -584,13 +590,15 @@ func Test_mapobjectstoreSpecToConfig(t *testing.T) { "foo": "bar", }, BaseStreamConfig: api.BaseStreamConfig{ - Account: "", - Creds: "", PreventDelete: false, PreventUpdate: false, - Nkey: "", - Servers: nil, - TLS: api.TLS{}, + ConnectionOpts: api.ConnectionOpts{ + Account: "", + Creds: "", + Nkey: "", + Servers: nil, + TLS: api.TLS{}, + }, }, }, want: jetstream.ObjectStoreConfig{ diff --git a/internal/controller/register.go b/internal/controller/register.go index 8c5c8def..d51696d6 100644 --- a/internal/controller/register.go +++ b/internal/controller/register.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "time" ctrl "sigs.k8s.io/controller-runtime" ) @@ -12,33 +13,54 @@ import ( // // Namespace restricts the controller to resources of the given namespace. type Config struct { - ReadOnly bool - Namespace string + ReadOnly bool + Namespace string + RequeueInterval time.Duration + CacheDir string } // RegisterAll registers all available jetStream controllers to the manager. // natsCfg is specific to the nats server connection. // controllerCfg defines behaviour of the registered controllers. func RegisterAll(mgr ctrl.Manager, clientConfig *NatsConfig, config *Config) error { + scheme := mgr.GetScheme() + + // Register controllers baseController, err := NewJSController(mgr.GetClient(), clientConfig, config) if err != nil { return fmt.Errorf("create base jetstream controller: %w", err) } - // Register controllers if err := (&AccountReconciler{ - baseController, + Scheme: scheme, + JetStreamController: baseController, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to create account controller: %w", err) } if err := (&ConsumerReconciler{ - baseController, + Scheme: scheme, + JetStreamController: baseController, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to create consumer controller: %w", err) } + if err := (&KeyValueReconciler{ + Scheme: scheme, + JetStreamController: baseController, + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("unable to create key-value controller: %w", err) + } + + if err := (&ObjectStoreReconciler{ + Scheme: scheme, + JetStreamController: baseController, + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("unable to create object store controller: %w", err) + } + if err := (&StreamReconciler{ + Scheme: scheme, JetStreamController: baseController, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to create stream controller: %w", err) diff --git a/internal/controller/stream_controller.go b/internal/controller/stream_controller.go index 511b9e72..49d18554 100644 --- a/internal/controller/stream_controller.go +++ b/internal/controller/stream_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "encoding/json" "errors" "fmt" "time" @@ -27,14 +28,18 @@ import ( "github.com/nats-io/nats.go/jetstream" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) // StreamReconciler reconciles a Stream object type StreamReconciler struct { + Scheme *runtime.Scheme + JetStreamController } @@ -61,7 +66,7 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr stream := &api.Stream{} if err := r.Get(ctx, req.NamespacedName, stream); err != nil { if apierrors.IsNotFound(err) { - log.Info("Stream resource not found. Ignoring since object must be deleted.") + log.Info("Stream deleted.", "streamName", req.NamespacedName.String()) return ctrl.Result{}, nil } return ctrl.Result{}, fmt.Errorf("get stream resource '%s': %w", req.NamespacedName.String(), err) @@ -72,7 +77,7 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Update ready status to unknown when no status is set if len(stream.Status.Conditions) == 0 { log.Info("Setting initial ready condition to unknown.") - stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation") + stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionUnknown, stateReconciling, "Starting reconciliation") err := r.Status().Update(ctx, stream) if err != nil { return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err) @@ -80,19 +85,6 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{Requeue: true}, nil } - // Add finalizer - if !controllerutil.ContainsFinalizer(stream, streamFinalizer) { - log.Info("Adding stream finalizer.") - if ok := controllerutil.AddFinalizer(stream, streamFinalizer); !ok { - return ctrl.Result{}, errors.New("failed to add finalizer to stream resource") - } - - if err := r.Update(ctx, stream); err != nil { - return ctrl.Result{}, fmt.Errorf("update stream resource to add finalizer: %w", err) - } - return ctrl.Result{}, nil - } - // Check Deletion markedForDeletion := stream.GetDeletionTimestamp() != nil if markedForDeletion { @@ -108,27 +100,56 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, nil } + // Add finalizer + if !controllerutil.ContainsFinalizer(stream, streamFinalizer) { + log.Info("Adding stream finalizer.") + if ok := controllerutil.AddFinalizer(stream, streamFinalizer); !ok { + return ctrl.Result{}, errors.New("failed to add finalizer to stream resource") + } + + if err := r.Update(ctx, stream); err != nil { + return ctrl.Result{}, fmt.Errorf("update stream resource to add finalizer: %w", err) + } + return ctrl.Result{}, nil + } + // Create or update stream if err := r.createOrUpdate(ctx, log, stream); err != nil { return ctrl.Result{}, fmt.Errorf("create or update: %s", err) } - return ctrl.Result{}, nil + + return ctrl.Result{RequeueAfter: r.RequeueInterval()}, nil } func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, stream *api.Stream) error { // Set status to false - stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.") + stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionFalse, stateFinalizing, "Performing finalizer operations.") if err := r.Status().Update(ctx, stream); err != nil { return fmt.Errorf("update ready condition: %w", err) } + storedState, err := getStoredStreamState(stream) + if err != nil { + log.Error(err, "Failed to fetch stored state.") + } + if !stream.Spec.PreventDelete && !r.ReadOnly() { log.Info("Deleting stream.") - err := r.WithJetStreamClient(streamConnOpts(stream.Spec), func(js jetstream.JetStream) error { + err := r.WithJetStreamClient(stream.Spec.ConnectionOpts, stream.Namespace, func(js jetstream.JetStream) error { + _, err := getServerStreamState(ctx, js, stream) + // If we have no known state for this stream it has never been reconciled. + // If we are also receiving an error fetching state, either the stream does not exist + // or this resource config is invalid. + if err != nil && storedState == nil { + return nil + } + return js.DeleteStream(ctx, stream.Spec.Name) }) if errors.Is(err, jetstream.ErrStreamNotFound) { log.Info("Stream does not exist, unable to delete.", "streamName", stream.Spec.Name) + } else if err != nil && storedState == nil { + log.Info("Stream not reconciled and no state received from server. Removing finalizer.") } else if err != nil { return fmt.Errorf("delete stream during finalization: %w", err) } @@ -152,13 +173,6 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, stream *api.Stream) error { // Create or Update the stream based on the spec - if r.ReadOnly() { - log.Info("Skipping stream creation or update.", - "read-only", r.ReadOnly(), - ) - return nil - } - // Map spec to stream targetConfig targetConfig, err := streamSpecToConfig(&stream.Spec) if err != nil { @@ -166,37 +180,78 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, } // CreateOrUpdateStream is called on every reconciliation when the stream is not to be deleted. - // TODO(future-feature): Do we need to check if config differs? - err = r.WithJetStreamClient(streamConnOpts(stream.Spec), func(js jetstream.JetStream) error { - exists := false - _, err := js.Stream(ctx, targetConfig.Name) - if err == nil { - exists = true - } else if !errors.Is(err, jetstream.ErrStreamNotFound) { - return err + err = r.WithJetStreamClient(stream.Spec.ConnectionOpts, stream.Namespace, func(js jetstream.JetStream) error { + storedState, err := getStoredStreamState(stream) + if err != nil { + log.Error(err, "Failed to fetch stored stream state") } - if !exists { - log.Info("Creating Stream.") - _, err := js.CreateStream(ctx, targetConfig) + serverState, err := getServerStreamState(ctx, js, stream) + if err != nil { return err } - if !stream.Spec.PreventUpdate { + // Check against known state. Skip Update if converged. + // Storing returned state from the server avoids have to + // check default values or call Update on already converged resources + if storedState != nil && serverState != nil && stream.Status.ObservedGeneration == stream.Generation { + diff := compareConfigState(storedState, serverState) + + if diff == "" { + return nil + } + + log.Info("Stream config drifted from desired state.", "diff", diff) + } + + if r.ReadOnly() { + log.Info("Skipping stream creation or update.", + "read-only", r.ReadOnly(), + ) + return nil + } + + var updatedStream jetstream.Stream + err = nil + + if serverState == nil { + log.Info("Creating Stream.") + updatedStream, err = js.CreateStream(ctx, targetConfig) + if err != nil { + return err + } + } else if !stream.Spec.PreventUpdate { log.Info("Updating Stream.") - _, err := js.UpdateStream(ctx, targetConfig) - return err + updatedStream, err = js.UpdateStream(ctx, targetConfig) + if err != nil { + return err + } } else { log.Info("Skipping Stream update.", "preventUpdate", stream.Spec.PreventUpdate, ) } + if updatedStream != nil { + // Store known state in annotation + updatedState, err := json.Marshal(updatedStream.CachedInfo().Config) + if err != nil { + return err + } + + if stream.Annotations == nil { + stream.Annotations = map[string]string{} + } + stream.Annotations[stateAnnotationStream] = string(updatedState) + + return r.Update(ctx, stream) + } + return nil }) if err != nil { err = fmt.Errorf("create or update stream: %w", err) - stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionFalse, "Errored", err.Error()) + stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionFalse, stateErrored, err.Error()) if err := r.Status().Update(ctx, stream); err != nil { log.Error(err, "Failed to update ready condition to Errored.") } @@ -208,7 +263,7 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, stream.Status.Conditions = updateReadyCondition( stream.Status.Conditions, v1.ConditionTrue, - "Reconciling", + stateReady, "Stream successfully created or updated.", ) err = r.Status().Update(ctx, stream) @@ -219,15 +274,30 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, return nil } -// streamConnOpts extracts nats connection relevant fields from the given stream spec as connectionOptions. -func streamConnOpts(spec api.StreamSpec) *connectionOptions { - return &connectionOptions{ - Account: spec.Account, - Creds: spec.Creds, - Nkey: spec.Nkey, - Servers: spec.Servers, - TLS: spec.TLS, +func getStoredStreamState(stream *api.Stream) (*jetstream.StreamConfig, error) { + var storedState *jetstream.StreamConfig + if state, ok := stream.Annotations[stateAnnotationStream]; ok { + err := json.Unmarshal([]byte(state), &storedState) + if err != nil { + return nil, err + } } + + return storedState, nil +} + +// Fetch the current state of the stream from the server. +// ErrStreamNotFound is considered a valid response and does not return error +func getServerStreamState(ctx context.Context, js jetstream.JetStream, stream *api.Stream) (*jetstream.StreamConfig, error) { + s, err := js.Stream(ctx, stream.Spec.Name) + if errors.Is(err, jetstream.ErrStreamNotFound) { + return nil, nil + } + if err != nil { + return nil, err + } + + return &s.CachedInfo().Config, nil } // streamSpecToConfig creates a jetstream.StreamConfig matching the given stream resource spec @@ -260,7 +330,7 @@ func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) { // retention if spec.Retention != "" { // Wrap string in " to be properly unmarshalled as json string - err := config.Retention.UnmarshalJSON(asJsonString(spec.Retention)) + err := config.Retention.UnmarshalJSON(jsonString(spec.Retention)) if err != nil { return jetstream.StreamConfig{}, fmt.Errorf("invalid retention policy: %w", err) } @@ -268,7 +338,7 @@ func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) { // discard if spec.Discard != "" { - err := config.Discard.UnmarshalJSON(asJsonString(spec.Discard)) + err := config.Discard.UnmarshalJSON(jsonString(spec.Discard)) if err != nil { return jetstream.StreamConfig{}, fmt.Errorf("invalid retention policy: %w", err) } @@ -284,7 +354,7 @@ func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) { } // storage if spec.Storage != "" { - err := config.Storage.UnmarshalJSON(asJsonString(spec.Storage)) + err := config.Storage.UnmarshalJSON(jsonString(spec.Storage)) if err != nil { return jetstream.StreamConfig{}, fmt.Errorf("invalid storage: %w", err) } @@ -330,7 +400,7 @@ func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) { // compression if spec.Compression != "" { - err := config.Compression.UnmarshalJSON(asJsonString(spec.Compression)) + err := config.Compression.UnmarshalJSON(jsonString(spec.Compression)) if err != nil { return jetstream.StreamConfig{}, fmt.Errorf("invalid compression: %w", err) } @@ -406,8 +476,9 @@ func mapStreamSource(ss *api.StreamSource) (*jetstream.StreamSource, error) { func (r *StreamReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&api.Stream{}). - Owns(&api.Stream{}). - // Only trigger on generation changes WithEventFilter(predicate.GenerationChangedPredicate{}). + WithOptions(controller.Options{ + MaxConcurrentReconciles: 1, + }). Complete(r) } diff --git a/internal/controller/stream_controller_test.go b/internal/controller/stream_controller_test.go index 8ba50b1b..b9ac6c6b 100644 --- a/internal/controller/stream_controller_test.go +++ b/internal/controller/stream_controller_test.go @@ -88,7 +88,8 @@ var _ = Describe("Stream Controller", func() { By("setting up the tested controller") controller = &StreamReconciler{ - baseController, + Scheme: k8sClient.Scheme(), + JetStreamController: baseController, } }) @@ -156,7 +157,7 @@ var _ = Describe("Stream Controller", func() { Expect(k8sClient.Get(ctx, typeNamespacedName, stream)).To(Succeed()) Expect(stream.Status.Conditions).To(HaveLen(1)) - assertReadyStateMatches(stream.Status.Conditions[0], v1.ConditionUnknown, "Reconciling", "Starting reconciliation", time.Now()) + assertReadyStateMatches(stream.Status.Conditions[0], v1.ConditionUnknown, stateReconciling, "Starting reconciliation", time.Now()) }) }) @@ -190,7 +191,7 @@ var _ = Describe("Stream Controller", func() { By("checking if the ready state was updated") Expect(stream.Status.Conditions).To(HaveLen(1)) - assertReadyStateMatches(stream.Status.Conditions[0], v1.ConditionTrue, "Reconciling", "created or updated", time.Now()) + assertReadyStateMatches(stream.Status.Conditions[0], v1.ConditionTrue, stateReady, "created or updated", time.Now()) By("checking if the observed generation matches") Expect(stream.Status.ObservedGeneration).To(Equal(stream.Generation)) @@ -254,9 +255,10 @@ var _ = Describe("Stream Controller", func() { When("read-only mode is enabled", func() { BeforeEach(func(ctx SpecContext) { By("setting read only on the controller") - readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{ReadOnly: true}) Expect(err).NotTo(HaveOccurred()) controller = &StreamReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: readOnly, } }) @@ -291,9 +293,10 @@ var _ = Describe("Stream Controller", func() { When("namespace restriction is enabled", func() { BeforeEach(func(ctx SpecContext) { By("setting a namespace on the resource") - namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{Namespace: "other-namespace"}) Expect(err).NotTo(HaveOccurred()) controller = &StreamReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: namespaced, } }) @@ -372,12 +375,13 @@ var _ = Describe("Stream Controller", func() { // Setup client for not running server // Use actual test server to ensure port not used by other service on test instance sv := CreateTestServer() - base, err := NewJSController(k8sClient, &NatsConfig{ServerURL: sv.ClientURL()}, &Config{}) + disconnectedController, err := NewJSController(k8sClient, &NatsConfig{ServerURL: sv.ClientURL()}, &Config{}) Expect(err).NotTo(HaveOccurred()) sv.Shutdown() controller := &StreamReconciler{ - base, + Scheme: k8sClient.Scheme(), + JetStreamController: disconnectedController, } By("reconciling resource") @@ -396,7 +400,7 @@ var _ = Describe("Stream Controller", func() { assertReadyStateMatches( stream.Status.Conditions[0], v1.ConditionFalse, - "Errored", + stateErrored, "create or update stream:", time.Now(), ) @@ -480,9 +484,10 @@ var _ = Describe("Stream Controller", func() { When("read only is set", func() { BeforeEach(func(ctx SpecContext) { By("setting read only on the controller") - readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true}) + readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{ReadOnly: true}) Expect(err).NotTo(HaveOccurred()) controller = &StreamReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: readOnly, } }) @@ -505,9 +510,10 @@ var _ = Describe("Stream Controller", func() { When("controller is restricted to different namespace", func() { BeforeEach(func(ctx SpecContext) { - namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"}) + namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{Namespace: "other-namespace"}) Expect(err).NotTo(HaveOccurred()) controller = &StreamReconciler{ + Scheme: k8sClient.Scheme(), JetStreamController: namespaced, } }) @@ -648,13 +654,15 @@ func Test_mapSpecToConfig(t *testing.T) { Storage: "file", Subjects: []string{"orders.*"}, BaseStreamConfig: api.BaseStreamConfig{ - Account: "", - Creds: "", - Nkey: "", PreventDelete: false, PreventUpdate: false, - Servers: nil, - TLS: api.TLS{}, + ConnectionOpts: api.ConnectionOpts{ + Account: "", + Creds: "", + Nkey: "", + Servers: nil, + TLS: api.TLS{}, + }, }, }, want: jetstream.StreamConfig{ diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 75bf60f5..a445def9 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -36,7 +36,7 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" + api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to @@ -47,6 +47,7 @@ var ( k8sClient client.Client testEnv *envtest.Environment testServer *server.Server + clientUrl string jsClient jetstream.JetStream baseController JetStreamController ) @@ -80,7 +81,7 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) - err = jetstreamnatsiov1beta2.AddToScheme(scheme.Scheme) + err = api.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) @@ -91,7 +92,8 @@ var _ = BeforeSuite(func() { testServer = CreateTestServer() Expect(err).NotTo(HaveOccurred()) - testNatsConfig := &NatsConfig{ServerURL: testServer.ClientURL()} + clientUrl = testServer.ClientURL() + testNatsConfig := &NatsConfig{ServerURL: clientUrl} baseController, err = NewJSController(k8sClient, testNatsConfig, &Config{}) Expect(err).NotTo(HaveOccurred()) jsClient, _, err = CreateJetStreamClient(testNatsConfig, true) diff --git a/internal/controller/types.go b/internal/controller/types.go index bd4561de..b9587735 100644 --- a/internal/controller/types.go +++ b/internal/controller/types.go @@ -2,8 +2,19 @@ package controller const ( readyCondType = "Ready" + accountFinalizer = "account.nats.io/finalizer" streamFinalizer = "stream.nats.io/finalizer" keyValueFinalizer = "kv.nats.io/finalizer" objectStoreFinalizer = "objectstore.nats.io/finalizer" consumerFinalizer = "consumer.nats.io/finalizer" + + stateAnnotationConsumer = "consumer.nats.io/state" + stateAnnotationKV = "kv.nats.io/state" + stateAnnotationObj = "objectstore.nats.io/state" + stateAnnotationStream = "stream.nats.io/state" + + stateReady = "Ready" + stateReconciling = "Reconciling" + stateErrored = "Errored" + stateFinalizing = "Finalizing" ) diff --git a/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go b/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go index d565e5c5..7ec1ec59 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go @@ -22,41 +22,41 @@ func (c *Consumer) GetSpec() interface{} { // ConsumerSpec is the spec for a Consumer resource type ConsumerSpec struct { + DurableName string `json:"durableName"` // Maps to Durable + Description string `json:"description"` + DeliverPolicy string `json:"deliverPolicy"` + OptStartSeq int `json:"optStartSeq"` + OptStartTime string `json:"optStartTime"` AckPolicy string `json:"ackPolicy"` AckWait string `json:"ackWait"` + MaxDeliver int `json:"maxDeliver"` BackOff []string `json:"backoff"` - Creds string `json:"creds"` - DeliverGroup string `json:"deliverGroup"` - DeliverPolicy string `json:"deliverPolicy"` - DeliverSubject string `json:"deliverSubject"` - Description string `json:"description"` - PreventDelete bool `json:"preventDelete"` - PreventUpdate bool `json:"preventUpdate"` - DurableName string `json:"durableName"` FilterSubject string `json:"filterSubject"` - FilterSubjects []string `json:"filterSubjects"` - FlowControl bool `json:"flowControl"` - HeadersOnly bool `json:"headersOnly"` - HeartbeatInterval string `json:"heartbeatInterval"` + ReplayPolicy string `json:"replayPolicy"` + RateLimitBps int `json:"rateLimitBps"` // Maps to RateLimit + SampleFreq string `json:"sampleFreq"` // Maps to SampleFrequency + MaxWaiting int `json:"maxWaiting"` MaxAckPending int `json:"maxAckPending"` - MaxDeliver int `json:"maxDeliver"` + HeadersOnly bool `json:"headersOnly"` MaxRequestBatch int `json:"maxRequestBatch"` MaxRequestExpires string `json:"maxRequestExpires"` MaxRequestMaxBytes int `json:"maxRequestMaxBytes"` - MaxWaiting int `json:"maxWaiting"` - MemStorage bool `json:"memStorage"` - Nkey string `json:"nkey"` - OptStartSeq int `json:"optStartSeq"` - OptStartTime string `json:"optStartTime"` - RateLimitBps int `json:"rateLimitBps"` - ReplayPolicy string `json:"replayPolicy"` + InactiveThreshold string `json:"inactiveThreshold"` Replicas int `json:"replicas"` - SampleFreq string `json:"sampleFreq"` - Servers []string `json:"servers"` - StreamName string `json:"streamName"` - TLS TLS `json:"tls"` - Account string `json:"account"` + MemStorage bool `json:"memStorage"` // Maps to MemoryStorage + FilterSubjects []string `json:"filterSubjects"` Metadata map[string]string `json:"metadata"` + + // Legacy API options for Push Consumers. + // controller-runtime implementation moves to modern JetStream API over legacy + // which does not support Push Consumers + FlowControl bool `json:"flowControl"` + DeliverSubject string `json:"deliverSubject"` + DeliverGroup string `json:"deliverGroup"` + HeartbeatInterval string `json:"heartbeatInterval"` + + StreamName string `json:"streamName"` + BaseStreamConfig } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/jetstream/apis/jetstream/v1beta2/types.go b/pkg/jetstream/apis/jetstream/v1beta2/types.go index 966389e8..c0d5cd80 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/types.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/types.go @@ -23,13 +23,18 @@ type Condition struct { } type BaseStreamConfig struct { - Account string `json:"account"` - Creds string `json:"creds"` - Nkey string `json:"nkey"` - PreventDelete bool `json:"preventDelete"` - PreventUpdate bool `json:"preventUpdate"` - Servers []string `json:"servers"` - TLS TLS `json:"tls"` + PreventDelete bool `json:"preventDelete"` + PreventUpdate bool `json:"preventUpdate"` + ConnectionOpts +} + +type ConnectionOpts struct { + Account string `json:"account"` + Creds string `json:"creds"` + Nkey string `json:"nkey"` + Servers []string `json:"servers"` + TLS TLS `json:"tls"` + TLSFirst bool `json:"tlsFirst"` } type ConsumerLimits struct { @@ -51,8 +56,8 @@ type TLSSecret struct { } type CredsSecret struct { - File string `json:"file"` - Secret SecretRef `json:"secret"` + File string `json:"file"` + Secret *SecretRef `json:"secret"` } type TokenSecret struct { diff --git a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go index 224a2b4d..f45b2f5c 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go @@ -99,7 +99,7 @@ func (in *AccountSpec) DeepCopyInto(out *AccountSpec) { if in.Creds != nil { in, out := &in.Creds, &out.Creds *out = new(CredsSecret) - **out = **in + (*in).DeepCopyInto(*out) } if in.Token != nil { in, out := &in.Token, &out.Token @@ -127,12 +127,7 @@ func (in *AccountSpec) DeepCopy() *AccountSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BaseStreamConfig) DeepCopyInto(out *BaseStreamConfig) { *out = *in - if in.Servers != nil { - in, out := &in.Servers, &out.Servers - *out = make([]string, len(*in)) - copy(*out, *in) - } - in.TLS.DeepCopyInto(&out.TLS) + in.ConnectionOpts.DeepCopyInto(&out.ConnectionOpts) return } @@ -162,6 +157,28 @@ func (in *Condition) DeepCopy() *Condition { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionOpts) DeepCopyInto(out *ConnectionOpts) { + *out = *in + if in.Servers != nil { + in, out := &in.Servers, &out.Servers + *out = make([]string, len(*in)) + copy(*out, *in) + } + in.TLS.DeepCopyInto(&out.TLS) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionOpts. +func (in *ConnectionOpts) DeepCopy() *ConnectionOpts { + if in == nil { + return nil + } + out := new(ConnectionOpts) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Consumer) DeepCopyInto(out *Consumer) { *out = *in @@ -252,12 +269,6 @@ func (in *ConsumerSpec) DeepCopyInto(out *ConsumerSpec) { *out = make([]string, len(*in)) copy(*out, *in) } - if in.Servers != nil { - in, out := &in.Servers, &out.Servers - *out = make([]string, len(*in)) - copy(*out, *in) - } - in.TLS.DeepCopyInto(&out.TLS) if in.Metadata != nil { in, out := &in.Metadata, &out.Metadata *out = make(map[string]string, len(*in)) @@ -265,6 +276,7 @@ func (in *ConsumerSpec) DeepCopyInto(out *ConsumerSpec) { (*out)[key] = val } } + in.BaseStreamConfig.DeepCopyInto(&out.BaseStreamConfig) return } @@ -297,7 +309,11 @@ func (in *CredentialsSecret) DeepCopy() *CredentialsSecret { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CredsSecret) DeepCopyInto(out *CredsSecret) { *out = *in - out.Secret = in.Secret + if in.Secret != nil { + in, out := &in.Secret, &out.Secret + *out = new(SecretRef) + **out = **in + } return } diff --git a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/basestreamconfig.go b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/basestreamconfig.go index 656d1e50..67ad6902 100644 --- a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/basestreamconfig.go +++ b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/basestreamconfig.go @@ -18,13 +18,8 @@ package v1beta2 // BaseStreamConfigApplyConfiguration represents a declarative configuration of the BaseStreamConfig type for use // with apply. type BaseStreamConfigApplyConfiguration struct { - Account *string `json:"account,omitempty"` - Creds *string `json:"creds,omitempty"` - Nkey *string `json:"nkey,omitempty"` - PreventDelete *bool `json:"preventDelete,omitempty"` - PreventUpdate *bool `json:"preventUpdate,omitempty"` - Servers []string `json:"servers,omitempty"` - TLS *TLSApplyConfiguration `json:"tls,omitempty"` + PreventDelete *bool `json:"preventDelete,omitempty"` + PreventUpdate *bool `json:"preventUpdate,omitempty"` } // BaseStreamConfigApplyConfiguration constructs a declarative configuration of the BaseStreamConfig type for use with @@ -33,30 +28,6 @@ func BaseStreamConfig() *BaseStreamConfigApplyConfiguration { return &BaseStreamConfigApplyConfiguration{} } -// WithAccount sets the Account field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Account field is set to the value of the last call. -func (b *BaseStreamConfigApplyConfiguration) WithAccount(value string) *BaseStreamConfigApplyConfiguration { - b.Account = &value - return b -} - -// WithCreds sets the Creds field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Creds field is set to the value of the last call. -func (b *BaseStreamConfigApplyConfiguration) WithCreds(value string) *BaseStreamConfigApplyConfiguration { - b.Creds = &value - return b -} - -// WithNkey sets the Nkey field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Nkey field is set to the value of the last call. -func (b *BaseStreamConfigApplyConfiguration) WithNkey(value string) *BaseStreamConfigApplyConfiguration { - b.Nkey = &value - return b -} - // WithPreventDelete sets the PreventDelete field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the PreventDelete field is set to the value of the last call. @@ -72,21 +43,3 @@ func (b *BaseStreamConfigApplyConfiguration) WithPreventUpdate(value bool) *Base b.PreventUpdate = &value return b } - -// WithServers adds the given value to the Servers field in the declarative configuration -// and returns the receiver, so that objects can be build by chaining "With" function invocations. -// If called multiple times, values provided by each call will be appended to the Servers field. -func (b *BaseStreamConfigApplyConfiguration) WithServers(values ...string) *BaseStreamConfigApplyConfiguration { - for i := range values { - b.Servers = append(b.Servers, values[i]) - } - return b -} - -// WithTLS sets the TLS field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the TLS field is set to the value of the last call. -func (b *BaseStreamConfigApplyConfiguration) WithTLS(value *TLSApplyConfiguration) *BaseStreamConfigApplyConfiguration { - b.TLS = value - return b -} diff --git a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/connectionopts.go b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/connectionopts.go new file mode 100644 index 00000000..05888032 --- /dev/null +++ b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/connectionopts.go @@ -0,0 +1,83 @@ +// Copyright 2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta2 + +// ConnectionOptsApplyConfiguration represents a declarative configuration of the ConnectionOpts type for use +// with apply. +type ConnectionOptsApplyConfiguration struct { + Account *string `json:"account,omitempty"` + Creds *string `json:"creds,omitempty"` + Nkey *string `json:"nkey,omitempty"` + Servers []string `json:"servers,omitempty"` + TLS *TLSApplyConfiguration `json:"tls,omitempty"` + TLSFirst *bool `json:"tlsFirst,omitempty"` +} + +// ConnectionOptsApplyConfiguration constructs a declarative configuration of the ConnectionOpts type for use with +// apply. +func ConnectionOpts() *ConnectionOptsApplyConfiguration { + return &ConnectionOptsApplyConfiguration{} +} + +// WithAccount sets the Account field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Account field is set to the value of the last call. +func (b *ConnectionOptsApplyConfiguration) WithAccount(value string) *ConnectionOptsApplyConfiguration { + b.Account = &value + return b +} + +// WithCreds sets the Creds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Creds field is set to the value of the last call. +func (b *ConnectionOptsApplyConfiguration) WithCreds(value string) *ConnectionOptsApplyConfiguration { + b.Creds = &value + return b +} + +// WithNkey sets the Nkey field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Nkey field is set to the value of the last call. +func (b *ConnectionOptsApplyConfiguration) WithNkey(value string) *ConnectionOptsApplyConfiguration { + b.Nkey = &value + return b +} + +// WithServers adds the given value to the Servers field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Servers field. +func (b *ConnectionOptsApplyConfiguration) WithServers(values ...string) *ConnectionOptsApplyConfiguration { + for i := range values { + b.Servers = append(b.Servers, values[i]) + } + return b +} + +// WithTLS sets the TLS field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the TLS field is set to the value of the last call. +func (b *ConnectionOptsApplyConfiguration) WithTLS(value *TLSApplyConfiguration) *ConnectionOptsApplyConfiguration { + b.TLS = value + return b +} + +// WithTLSFirst sets the TLSFirst field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the TLSFirst field is set to the value of the last call. +func (b *ConnectionOptsApplyConfiguration) WithTLSFirst(value bool) *ConnectionOptsApplyConfiguration { + b.TLSFirst = &value + return b +} diff --git a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/consumerspec.go b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/consumerspec.go index ca97ae62..0604bf72 100644 --- a/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/consumerspec.go +++ b/pkg/jetstream/generated/applyconfiguration/jetstream/v1beta2/consumerspec.go @@ -18,41 +18,35 @@ package v1beta2 // ConsumerSpecApplyConfiguration represents a declarative configuration of the ConsumerSpec type for use // with apply. type ConsumerSpecApplyConfiguration struct { - AckPolicy *string `json:"ackPolicy,omitempty"` - AckWait *string `json:"ackWait,omitempty"` - BackOff []string `json:"backoff,omitempty"` - Creds *string `json:"creds,omitempty"` - DeliverGroup *string `json:"deliverGroup,omitempty"` - DeliverPolicy *string `json:"deliverPolicy,omitempty"` - DeliverSubject *string `json:"deliverSubject,omitempty"` - Description *string `json:"description,omitempty"` - PreventDelete *bool `json:"preventDelete,omitempty"` - PreventUpdate *bool `json:"preventUpdate,omitempty"` - DurableName *string `json:"durableName,omitempty"` - FilterSubject *string `json:"filterSubject,omitempty"` - FilterSubjects []string `json:"filterSubjects,omitempty"` - FlowControl *bool `json:"flowControl,omitempty"` - HeadersOnly *bool `json:"headersOnly,omitempty"` - HeartbeatInterval *string `json:"heartbeatInterval,omitempty"` - MaxAckPending *int `json:"maxAckPending,omitempty"` - MaxDeliver *int `json:"maxDeliver,omitempty"` - MaxRequestBatch *int `json:"maxRequestBatch,omitempty"` - MaxRequestExpires *string `json:"maxRequestExpires,omitempty"` - MaxRequestMaxBytes *int `json:"maxRequestMaxBytes,omitempty"` - MaxWaiting *int `json:"maxWaiting,omitempty"` - MemStorage *bool `json:"memStorage,omitempty"` - Nkey *string `json:"nkey,omitempty"` - OptStartSeq *int `json:"optStartSeq,omitempty"` - OptStartTime *string `json:"optStartTime,omitempty"` - RateLimitBps *int `json:"rateLimitBps,omitempty"` - ReplayPolicy *string `json:"replayPolicy,omitempty"` - Replicas *int `json:"replicas,omitempty"` - SampleFreq *string `json:"sampleFreq,omitempty"` - Servers []string `json:"servers,omitempty"` - StreamName *string `json:"streamName,omitempty"` - TLS *TLSApplyConfiguration `json:"tls,omitempty"` - Account *string `json:"account,omitempty"` - Metadata map[string]string `json:"metadata,omitempty"` + DurableName *string `json:"durableName,omitempty"` + Description *string `json:"description,omitempty"` + DeliverPolicy *string `json:"deliverPolicy,omitempty"` + OptStartSeq *int `json:"optStartSeq,omitempty"` + OptStartTime *string `json:"optStartTime,omitempty"` + AckPolicy *string `json:"ackPolicy,omitempty"` + AckWait *string `json:"ackWait,omitempty"` + MaxDeliver *int `json:"maxDeliver,omitempty"` + BackOff []string `json:"backoff,omitempty"` + FilterSubject *string `json:"filterSubject,omitempty"` + ReplayPolicy *string `json:"replayPolicy,omitempty"` + RateLimitBps *int `json:"rateLimitBps,omitempty"` + SampleFreq *string `json:"sampleFreq,omitempty"` + MaxWaiting *int `json:"maxWaiting,omitempty"` + MaxAckPending *int `json:"maxAckPending,omitempty"` + HeadersOnly *bool `json:"headersOnly,omitempty"` + MaxRequestBatch *int `json:"maxRequestBatch,omitempty"` + MaxRequestExpires *string `json:"maxRequestExpires,omitempty"` + MaxRequestMaxBytes *int `json:"maxRequestMaxBytes,omitempty"` + InactiveThreshold *string `json:"inactiveThreshold,omitempty"` + Replicas *int `json:"replicas,omitempty"` + MemStorage *bool `json:"memStorage,omitempty"` + FilterSubjects []string `json:"filterSubjects,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` + FlowControl *bool `json:"flowControl,omitempty"` + DeliverSubject *string `json:"deliverSubject,omitempty"` + DeliverGroup *string `json:"deliverGroup,omitempty"` + HeartbeatInterval *string `json:"heartbeatInterval,omitempty"` + StreamName *string `json:"streamName,omitempty"` } // ConsumerSpecApplyConfiguration constructs a declarative configuration of the ConsumerSpec type for use with @@ -61,45 +55,19 @@ func ConsumerSpec() *ConsumerSpecApplyConfiguration { return &ConsumerSpecApplyConfiguration{} } -// WithAckPolicy sets the AckPolicy field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the AckPolicy field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithAckPolicy(value string) *ConsumerSpecApplyConfiguration { - b.AckPolicy = &value - return b -} - -// WithAckWait sets the AckWait field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the AckWait field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithAckWait(value string) *ConsumerSpecApplyConfiguration { - b.AckWait = &value - return b -} - -// WithBackOff adds the given value to the BackOff field in the declarative configuration -// and returns the receiver, so that objects can be build by chaining "With" function invocations. -// If called multiple times, values provided by each call will be appended to the BackOff field. -func (b *ConsumerSpecApplyConfiguration) WithBackOff(values ...string) *ConsumerSpecApplyConfiguration { - for i := range values { - b.BackOff = append(b.BackOff, values[i]) - } - return b -} - -// WithCreds sets the Creds field in the declarative configuration to the given value +// WithDurableName sets the DurableName field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Creds field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithCreds(value string) *ConsumerSpecApplyConfiguration { - b.Creds = &value +// If called multiple times, the DurableName field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithDurableName(value string) *ConsumerSpecApplyConfiguration { + b.DurableName = &value return b } -// WithDeliverGroup sets the DeliverGroup field in the declarative configuration to the given value +// WithDescription sets the Description field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the DeliverGroup field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithDeliverGroup(value string) *ConsumerSpecApplyConfiguration { - b.DeliverGroup = &value +// If called multiple times, the Description field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithDescription(value string) *ConsumerSpecApplyConfiguration { + b.Description = &value return b } @@ -111,43 +79,53 @@ func (b *ConsumerSpecApplyConfiguration) WithDeliverPolicy(value string) *Consum return b } -// WithDeliverSubject sets the DeliverSubject field in the declarative configuration to the given value +// WithOptStartSeq sets the OptStartSeq field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the DeliverSubject field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithDeliverSubject(value string) *ConsumerSpecApplyConfiguration { - b.DeliverSubject = &value +// If called multiple times, the OptStartSeq field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithOptStartSeq(value int) *ConsumerSpecApplyConfiguration { + b.OptStartSeq = &value return b } -// WithDescription sets the Description field in the declarative configuration to the given value +// WithOptStartTime sets the OptStartTime field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Description field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithDescription(value string) *ConsumerSpecApplyConfiguration { - b.Description = &value +// If called multiple times, the OptStartTime field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithOptStartTime(value string) *ConsumerSpecApplyConfiguration { + b.OptStartTime = &value return b } -// WithPreventDelete sets the PreventDelete field in the declarative configuration to the given value +// WithAckPolicy sets the AckPolicy field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the PreventDelete field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithPreventDelete(value bool) *ConsumerSpecApplyConfiguration { - b.PreventDelete = &value +// If called multiple times, the AckPolicy field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithAckPolicy(value string) *ConsumerSpecApplyConfiguration { + b.AckPolicy = &value return b } -// WithPreventUpdate sets the PreventUpdate field in the declarative configuration to the given value +// WithAckWait sets the AckWait field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the PreventUpdate field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithPreventUpdate(value bool) *ConsumerSpecApplyConfiguration { - b.PreventUpdate = &value +// If called multiple times, the AckWait field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithAckWait(value string) *ConsumerSpecApplyConfiguration { + b.AckWait = &value return b } -// WithDurableName sets the DurableName field in the declarative configuration to the given value +// WithMaxDeliver sets the MaxDeliver field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the DurableName field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithDurableName(value string) *ConsumerSpecApplyConfiguration { - b.DurableName = &value +// If called multiple times, the MaxDeliver field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithMaxDeliver(value int) *ConsumerSpecApplyConfiguration { + b.MaxDeliver = &value + return b +} + +// WithBackOff adds the given value to the BackOff field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the BackOff field. +func (b *ConsumerSpecApplyConfiguration) WithBackOff(values ...string) *ConsumerSpecApplyConfiguration { + for i := range values { + b.BackOff = append(b.BackOff, values[i]) + } return b } @@ -159,37 +137,35 @@ func (b *ConsumerSpecApplyConfiguration) WithFilterSubject(value string) *Consum return b } -// WithFilterSubjects adds the given value to the FilterSubjects field in the declarative configuration -// and returns the receiver, so that objects can be build by chaining "With" function invocations. -// If called multiple times, values provided by each call will be appended to the FilterSubjects field. -func (b *ConsumerSpecApplyConfiguration) WithFilterSubjects(values ...string) *ConsumerSpecApplyConfiguration { - for i := range values { - b.FilterSubjects = append(b.FilterSubjects, values[i]) - } +// WithReplayPolicy sets the ReplayPolicy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ReplayPolicy field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithReplayPolicy(value string) *ConsumerSpecApplyConfiguration { + b.ReplayPolicy = &value return b } -// WithFlowControl sets the FlowControl field in the declarative configuration to the given value +// WithRateLimitBps sets the RateLimitBps field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the FlowControl field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithFlowControl(value bool) *ConsumerSpecApplyConfiguration { - b.FlowControl = &value +// If called multiple times, the RateLimitBps field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithRateLimitBps(value int) *ConsumerSpecApplyConfiguration { + b.RateLimitBps = &value return b } -// WithHeadersOnly sets the HeadersOnly field in the declarative configuration to the given value +// WithSampleFreq sets the SampleFreq field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the HeadersOnly field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithHeadersOnly(value bool) *ConsumerSpecApplyConfiguration { - b.HeadersOnly = &value +// If called multiple times, the SampleFreq field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithSampleFreq(value string) *ConsumerSpecApplyConfiguration { + b.SampleFreq = &value return b } -// WithHeartbeatInterval sets the HeartbeatInterval field in the declarative configuration to the given value +// WithMaxWaiting sets the MaxWaiting field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the HeartbeatInterval field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithHeartbeatInterval(value string) *ConsumerSpecApplyConfiguration { - b.HeartbeatInterval = &value +// If called multiple times, the MaxWaiting field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithMaxWaiting(value int) *ConsumerSpecApplyConfiguration { + b.MaxWaiting = &value return b } @@ -201,11 +177,11 @@ func (b *ConsumerSpecApplyConfiguration) WithMaxAckPending(value int) *ConsumerS return b } -// WithMaxDeliver sets the MaxDeliver field in the declarative configuration to the given value +// WithHeadersOnly sets the HeadersOnly field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the MaxDeliver field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithMaxDeliver(value int) *ConsumerSpecApplyConfiguration { - b.MaxDeliver = &value +// If called multiple times, the HeadersOnly field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithHeadersOnly(value bool) *ConsumerSpecApplyConfiguration { + b.HeadersOnly = &value return b } @@ -233,85 +209,83 @@ func (b *ConsumerSpecApplyConfiguration) WithMaxRequestMaxBytes(value int) *Cons return b } -// WithMaxWaiting sets the MaxWaiting field in the declarative configuration to the given value +// WithInactiveThreshold sets the InactiveThreshold field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the MaxWaiting field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithMaxWaiting(value int) *ConsumerSpecApplyConfiguration { - b.MaxWaiting = &value +// If called multiple times, the InactiveThreshold field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithInactiveThreshold(value string) *ConsumerSpecApplyConfiguration { + b.InactiveThreshold = &value return b } -// WithMemStorage sets the MemStorage field in the declarative configuration to the given value +// WithReplicas sets the Replicas field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the MemStorage field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithMemStorage(value bool) *ConsumerSpecApplyConfiguration { - b.MemStorage = &value +// If called multiple times, the Replicas field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithReplicas(value int) *ConsumerSpecApplyConfiguration { + b.Replicas = &value return b } -// WithNkey sets the Nkey field in the declarative configuration to the given value +// WithMemStorage sets the MemStorage field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Nkey field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithNkey(value string) *ConsumerSpecApplyConfiguration { - b.Nkey = &value +// If called multiple times, the MemStorage field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithMemStorage(value bool) *ConsumerSpecApplyConfiguration { + b.MemStorage = &value return b } -// WithOptStartSeq sets the OptStartSeq field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the OptStartSeq field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithOptStartSeq(value int) *ConsumerSpecApplyConfiguration { - b.OptStartSeq = &value +// WithFilterSubjects adds the given value to the FilterSubjects field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the FilterSubjects field. +func (b *ConsumerSpecApplyConfiguration) WithFilterSubjects(values ...string) *ConsumerSpecApplyConfiguration { + for i := range values { + b.FilterSubjects = append(b.FilterSubjects, values[i]) + } return b } -// WithOptStartTime sets the OptStartTime field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the OptStartTime field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithOptStartTime(value string) *ConsumerSpecApplyConfiguration { - b.OptStartTime = &value +// WithMetadata puts the entries into the Metadata field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Metadata field, +// overwriting an existing map entries in Metadata field with the same key. +func (b *ConsumerSpecApplyConfiguration) WithMetadata(entries map[string]string) *ConsumerSpecApplyConfiguration { + if b.Metadata == nil && len(entries) > 0 { + b.Metadata = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Metadata[k] = v + } return b } -// WithRateLimitBps sets the RateLimitBps field in the declarative configuration to the given value +// WithFlowControl sets the FlowControl field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the RateLimitBps field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithRateLimitBps(value int) *ConsumerSpecApplyConfiguration { - b.RateLimitBps = &value +// If called multiple times, the FlowControl field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithFlowControl(value bool) *ConsumerSpecApplyConfiguration { + b.FlowControl = &value return b } -// WithReplayPolicy sets the ReplayPolicy field in the declarative configuration to the given value +// WithDeliverSubject sets the DeliverSubject field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the ReplayPolicy field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithReplayPolicy(value string) *ConsumerSpecApplyConfiguration { - b.ReplayPolicy = &value +// If called multiple times, the DeliverSubject field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithDeliverSubject(value string) *ConsumerSpecApplyConfiguration { + b.DeliverSubject = &value return b } -// WithReplicas sets the Replicas field in the declarative configuration to the given value +// WithDeliverGroup sets the DeliverGroup field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Replicas field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithReplicas(value int) *ConsumerSpecApplyConfiguration { - b.Replicas = &value +// If called multiple times, the DeliverGroup field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithDeliverGroup(value string) *ConsumerSpecApplyConfiguration { + b.DeliverGroup = &value return b } -// WithSampleFreq sets the SampleFreq field in the declarative configuration to the given value +// WithHeartbeatInterval sets the HeartbeatInterval field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the SampleFreq field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithSampleFreq(value string) *ConsumerSpecApplyConfiguration { - b.SampleFreq = &value - return b -} - -// WithServers adds the given value to the Servers field in the declarative configuration -// and returns the receiver, so that objects can be build by chaining "With" function invocations. -// If called multiple times, values provided by each call will be appended to the Servers field. -func (b *ConsumerSpecApplyConfiguration) WithServers(values ...string) *ConsumerSpecApplyConfiguration { - for i := range values { - b.Servers = append(b.Servers, values[i]) - } +// If called multiple times, the HeartbeatInterval field is set to the value of the last call. +func (b *ConsumerSpecApplyConfiguration) WithHeartbeatInterval(value string) *ConsumerSpecApplyConfiguration { + b.HeartbeatInterval = &value return b } @@ -322,33 +296,3 @@ func (b *ConsumerSpecApplyConfiguration) WithStreamName(value string) *ConsumerS b.StreamName = &value return b } - -// WithTLS sets the TLS field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the TLS field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithTLS(value *TLSApplyConfiguration) *ConsumerSpecApplyConfiguration { - b.TLS = value - return b -} - -// WithAccount sets the Account field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Account field is set to the value of the last call. -func (b *ConsumerSpecApplyConfiguration) WithAccount(value string) *ConsumerSpecApplyConfiguration { - b.Account = &value - return b -} - -// WithMetadata puts the entries into the Metadata field in the declarative configuration -// and returns the receiver, so that objects can be build by chaining "With" function invocations. -// If called multiple times, the entries provided by each call will be put on the Metadata field, -// overwriting an existing map entries in Metadata field with the same key. -func (b *ConsumerSpecApplyConfiguration) WithMetadata(entries map[string]string) *ConsumerSpecApplyConfiguration { - if b.Metadata == nil && len(entries) > 0 { - b.Metadata = make(map[string]string, len(entries)) - } - for k, v := range entries { - b.Metadata[k] = v - } - return b -} diff --git a/pkg/jetstream/generated/applyconfiguration/utils.go b/pkg/jetstream/generated/applyconfiguration/utils.go index 7ccfa1ae..0f13a0e2 100644 --- a/pkg/jetstream/generated/applyconfiguration/utils.go +++ b/pkg/jetstream/generated/applyconfiguration/utils.go @@ -37,6 +37,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &jetstreamv1beta2.BaseStreamConfigApplyConfiguration{} case v1beta2.SchemeGroupVersion.WithKind("Condition"): return &jetstreamv1beta2.ConditionApplyConfiguration{} + case v1beta2.SchemeGroupVersion.WithKind("ConnectionOpts"): + return &jetstreamv1beta2.ConnectionOptsApplyConfiguration{} case v1beta2.SchemeGroupVersion.WithKind("Consumer"): return &jetstreamv1beta2.ConsumerApplyConfiguration{} case v1beta2.SchemeGroupVersion.WithKind("ConsumerLimits"):