Skip to content

Commit

Permalink
Improve config comparison logic
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelattwood committed Jan 15, 2025
1 parent 682073f commit 8e896c5
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 205 deletions.
6 changes: 3 additions & 3 deletions internal/controller/account_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *AccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
// Update ready status to unknown when no status is set
if len(account.Status.Conditions) == 0 {
log.Info("Setting initial ready condition to unknown.")
account.Status.Conditions = updateReadyCondition(account.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation")
account.Status.Conditions = updateReadyCondition(account.Status.Conditions, v1.ConditionUnknown, stateReconciling, "Starting reconciliation")
err := r.Status().Update(ctx, account)
if err != nil {
return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err)
Expand Down Expand Up @@ -113,7 +113,7 @@ func (r *AccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
account.Status.Conditions = updateReadyCondition(
account.Status.Conditions,
v1.ConditionFalse,
"DeletionBlocked",
stateFinalizing,
"Account has dependent resources that must be deleted first",
)
if err := r.Status().Update(ctx, account); err != nil {
Expand All @@ -140,7 +140,7 @@ func (r *AccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
account.Status.Conditions = updateReadyCondition(
account.Status.Conditions,
v1.ConditionTrue,
"Ready",
stateReady,
"Account is ready",
)
if err := r.Status().Update(ctx, account); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/account_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ var _ = Describe("Account Controller", func() {
assertReadyStateMatches(
account.Status.Conditions[0],
v1.ConditionFalse,
"DeletionBlocked",
stateFinalizing,
"Account has dependent resources that must be deleted first",
time.Now(),
)
Expand Down
144 changes: 78 additions & 66 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,20 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

if consumer.Spec.FlowControl || consumer.Spec.DeliverSubject != "" || consumer.Spec.DeliverGroup != "" || consumer.Spec.HeartbeatInterval != "" {
log.Info("FlowControl, DeliverSubject, DeliverGroup, and HeartbeatInterval are Push Consumer options, which are no longer supported. Skipping consumer creation or update.")
log.Info("FlowControl, DeliverSubject, DeliverGroup, and HeartbeatInterval are Push Consumer options, which are not supported. Skipping consumer creation or update.")
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, stateErrored, "Push Consumer options are not supported.")
if err := r.Status().Update(ctx, consumer); err != nil {
log.Error(err, "Failed to update ready condition to Errored.")
}
return ctrl.Result{}, nil
}

// Create or update stream
if err := r.createOrUpdate(ctx, log, consumer); err != nil {
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, stateErrored, err.Error())
if err := r.Status().Update(ctx, consumer); err != nil {
log.Error(err, "Failed to update ready condition to Errored.")
}
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
}
return ctrl.Result{}, nil
Expand All @@ -129,15 +137,21 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger
return fmt.Errorf("update ready condition: %w", err)
}

storedState, err := getStoredConsumerState(consumer)
if err != nil {
log.Error(err, "Failed to fetch stored state.")
}

if !consumer.Spec.PreventDelete && !r.ReadOnly() {
err := r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error {
_, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
if err != nil {
if errors.Is(err, jetstream.ErrConsumerNotFound) || errors.Is(err, jetstream.ErrJetStreamNotEnabled) || errors.Is(err, jetstream.ErrJetStreamNotEnabledForAccount) {
return nil
}
return err
_, err := getServerConsumerState(ctx, js, consumer)
// If we have no known state for this consumer it has never been reconciled.
// If we are also receiving an error fetching state, either the consumer does not exist
// or this resource config is invalid.
if err != nil && storedState == nil {
return nil
}

return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
})
switch {
Expand Down Expand Up @@ -178,38 +192,27 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
}

err = r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error {
exists := false
c, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
if err == nil {
exists = true
} else if !errors.Is(err, jetstream.ErrConsumerNotFound) {
return err
storedState, err := getStoredConsumerState(consumer)
if err != nil {
log.Error(err, "Failed to fetch stored consumer state.")
}

// Check against known state. Skip Update if converged
// Storing returned state from the server avoids
// having to check default values
if exists {
var knownState *jetstream.ConsumerConfig
if state, ok := consumer.Annotations[stateAnnotationConsumer]; ok {
err := json.Unmarshal([]byte(state), &knownState)
if err != nil {
log.Error(err, "Failed to unmarshal known state from annotation.")
}
}

if knownState != nil {
converged, err := compareConsumerConfig(&c.CachedInfo().Config, knownState)
if err != nil {
log.Error(err, "Failed to compare consumer config.")
}
serverState, err := getServerConsumerState(ctx, js, consumer)
if err != nil {
return err
}

if converged {
return nil
}
// Check against known state. Skip Update if converged.
// Storing returned state from the server avoids have to
// check default values or call Update on already converged resources
if storedState != nil && serverState != nil && consumer.Status.ObservedGeneration == consumer.Generation {
diff := compareConfigState(storedState, serverState)

log.Info("Consumer config drifted from desired state.")
if diff == "" {
return nil
}

log.Info("Consumer config drifted from desired state.", "diff", diff)
}

if r.ReadOnly() {
Expand All @@ -222,35 +225,36 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
var updatedConsumer jetstream.Consumer
err = nil

if !exists {
if serverState == nil {
log.Info("Creating Consumer.")
updatedConsumer, err = js.CreateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
}

if !consumer.Spec.PreventUpdate {
if err != nil {
return err
}
} else if !consumer.Spec.PreventUpdate {
log.Info("Updating Consumer.")
updatedConsumer, err = js.UpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
if err != nil {
return err
}
} else {
log.Info("Skipping Consumer update.",
"preventUpdate", consumer.Spec.PreventUpdate,
)
}
if err != nil {
return err
}

if updatedConsumer != nil {
// Store known state in annotation
knownState, err := json.Marshal(updatedConsumer.CachedInfo().Config)
updatedState, err := json.Marshal(updatedConsumer.CachedInfo().Config)
if err != nil {
log.Error(err, "Failed to marshal known state to annotation.")
} else {
if consumer.Annotations == nil {
consumer.Annotations = map[string]string{}
}
consumer.Annotations[stateAnnotationStream] = string(knownState)
return err
}

if consumer.Annotations == nil {
consumer.Annotations = map[string]string{}
}
consumer.Annotations[stateAnnotationConsumer] = string(updatedState)

return r.Update(ctx, consumer)
}

Expand Down Expand Up @@ -281,6 +285,32 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
return nil
}

func getStoredConsumerState(consumer *api.Consumer) (*jetstream.ConsumerConfig, error) {
var storedState *jetstream.ConsumerConfig
if state, ok := consumer.Annotations[stateAnnotationConsumer]; ok {
err := json.Unmarshal([]byte(state), &storedState)
if err != nil {
return nil, err
}
}

return storedState, nil
}

// Fetch the current state of the consumer from the server.
// ErrConsumerNotFound is considered a valid response and does not return error
func getServerConsumerState(ctx context.Context, js jetstream.JetStream, consumer *api.Consumer) (*jetstream.ConsumerConfig, error) {
c, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
if errors.Is(err, jetstream.ErrConsumerNotFound) {
return nil, nil
}
if err != nil {
return nil, err
}

return &c.CachedInfo().Config, nil
}

func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) {
config := &jetstream.ConsumerConfig{
Durable: spec.DurableName,
Expand Down Expand Up @@ -373,24 +403,6 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er
return config, nil
}

func compareConsumerConfig(actual *jetstream.ConsumerConfig, desired *jetstream.ConsumerConfig) (bool, error) {
if actual == nil || desired == nil {
return false, nil
}

actualJson, err := json.Marshal(actual)
if err != nil {
return false, fmt.Errorf("error marshaling source config: %w", err)
}

desiredJson, err := json.Marshal(desired)
if err != nil {
return false, fmt.Errorf("error marshaling target config: %w", err)
}

return string(actualJson) == string(desiredJson), nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ConsumerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/consumer_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ var _ = Describe("Consumer Controller", func() {
Expect(k8sClient.Get(ctx, typeNamespacedName, consumer)).To(Succeed())
Expect(consumer.Status.Conditions).To(HaveLen(1))

assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionUnknown, "Reconciling", "Starting reconciliation", time.Now())
assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionUnknown, stateReconciling, "Starting reconciliation", time.Now())
})
})

Expand Down Expand Up @@ -212,7 +212,7 @@ var _ = Describe("Consumer Controller", func() {
assertReadyStateMatches(
consumer.Status.Conditions[0],
v1.ConditionFalse,
"Errored",
stateErrored,
"stream", // Not existing stream as message
time.Now(),
)
Expand All @@ -230,7 +230,7 @@ var _ = Describe("Consumer Controller", func() {

By("checking if the ready state was updated")
Expect(consumer.Status.Conditions).To(HaveLen(1))
assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionTrue, "Reconciling", "created or updated", time.Now())
assertReadyStateMatches(consumer.Status.Conditions[0], v1.ConditionTrue, stateReady, "created or updated", time.Now())

By("checking if the observed generation matches")
Expect(consumer.Status.ObservedGeneration).To(Equal(consumer.Generation))
Expand Down Expand Up @@ -321,7 +321,7 @@ var _ = Describe("Consumer Controller", func() {
When("read-only mode is enabled", func() {
BeforeEach(func(ctx SpecContext) {
By("setting read only on the controller")
readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true})
readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{ReadOnly: true})
Expect(err).NotTo(HaveOccurred())
controller = &ConsumerReconciler{
Scheme: k8sClient.Scheme(),
Expand Down Expand Up @@ -359,7 +359,7 @@ var _ = Describe("Consumer Controller", func() {
When("namespace restriction is enabled", func() {
BeforeEach(func(ctx SpecContext) {
By("setting a namespace on the resource")
namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"})
namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: clientUrl}, &Config{Namespace: "other-namespace"})
Expect(err).NotTo(HaveOccurred())
controller = &ConsumerReconciler{
Scheme: k8sClient.Scheme(),
Expand Down
5 changes: 5 additions & 0 deletions internal/controller/jetstream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/google/go-cmp/cmp"
js "github.com/nats-io/nack/controllers/jetstream"
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/nats-io/nats.go/jetstream"
Expand Down Expand Up @@ -264,3 +265,7 @@ func updateReadyCondition(conditions []api.Condition, status v1.ConditionStatus,
func jsonString(v string) []byte {
return []byte("\"" + v + "\"")
}

func compareConfigState(actual any, desired any) string {
return cmp.Diff(actual, desired)
}
Loading

0 comments on commit 8e896c5

Please sign in to comment.