Skip to content

Commit

Permalink
Add Account Controller (#224)
Browse files Browse the repository at this point in the history
* Add explicit Scheme field to Reconciler structs to better match convention

* Update spec to assume consumer field 'Name' over deprecated 'DurableName'

* Add account controller and support for account CRD auth config

* Update CRDs. Slight change to connection options

* Namespace and error handling improvements

* Improve deleted log message

* Namespace fix

* Add check for removed Push Consumer options

* Revert consumer name change

* Add InactiveThreshold parsing

* Fix test

* Add Watch for Account resource changes to trigger reconcile of dependent resources. Improve connection opts handling

* Add actual/desired state comparison for stream/consumer to avoid unnecessary update calls. Corrected ready state

* Fix duplicate resource tracking

* Improve config comparison logic

* Add flags for sync interval and cache directory

* Add back generation changed filter. Move finalizer add to after deletion check.

* Rework Reconcile scheduled sync
  • Loading branch information
samuelattwood authored Jan 16, 2025
1 parent 6fa2f62 commit 4e12db4
Show file tree
Hide file tree
Showing 26 changed files with 1,573 additions and 733 deletions.
25 changes: 19 additions & 6 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/nats-io/nack/controllers/jetstream"
"github.com/nats-io/nack/internal/controller"
jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
v1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
clientset "github.com/nats-io/nack/pkg/jetstream/generated/clientset/versioned"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -67,10 +67,12 @@ func run() error {
ca := flag.String("tlsca", "", "NATS TLS certificate authority chain")
tlsfirst := flag.Bool("tlsfirst", false, "If enabled, forces explicit TLS without waiting for Server INFO")
server := flag.String("s", "", "NATS Server URL")
crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config")
crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config. Ignored if running with control loop, CRD options will always override global config")
cleanupPeriod := flag.Duration("cleanup-period", 30*time.Second, "Period to run object cleanup")
readOnly := flag.Bool("read-only", false, "Starts the controller without causing changes to the NATS resources")
cacheDir := flag.String("cache-dir", "", "Directory to store cached credential and TLS files")
controlLoop := flag.Bool("control-loop", false, "Experimental: Run controller with a full reconciliation control loop.")
controlLoopSyncInterval := flag.Duration("sync-interval", 5*time.Minute, "Interval to perform scheduled reconcile")

flag.Parse()

Expand Down Expand Up @@ -108,9 +110,12 @@ func run() error {
}

controllerCfg := &controller.Config{
ReadOnly: *readOnly,
Namespace: *namespace,
ReadOnly: *readOnly,
Namespace: *namespace,
CacheDir: *cacheDir,
RequeueInterval: *controlLoopSyncInterval,
}

return runControlLoop(config, natsCfg, controllerCfg)
}

Expand Down Expand Up @@ -160,17 +165,25 @@ func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, control
// Setup scheme
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(jetstreamnatsiov1beta2.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))

mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
Logger: klog.NewKlogr().WithName("controller-runtime"),
// TODO Add full configuration
})
if err != nil {
return fmt.Errorf("unable to start manager: %w", err)
}

if controllerCfg.CacheDir == "" {
cacheDir, err := os.MkdirTemp(".", "nack")
if err != nil {
return fmt.Errorf("create cache dir: %w", err)
}
defer os.RemoveAll(cacheDir)
controllerCfg.CacheDir = cacheDir
}

err = controller.RegisterAll(mgr, natsCfg, controllerCfg)
if err != nil {
return fmt.Errorf("register jetstream controllers: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *Controller) processConsumerObject(cns *apis.Consumer, jsm jsmClientFunc
}
}
// Lookup the UserCredentials.
if acc.Spec.Creds != nil {
if acc.Spec.Creds != nil && acc.Spec.Creds.Secret != nil {
secretName := acc.Spec.Creds.Secret.Name
secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsm jsmClientFunc) (e
}
}
// Lookup the UserCredentials.
if acc.Spec.Creds != nil {
if acc.Spec.Creds != nil && acc.Spec.Creds.Secret != nil {
secretName := acc.Spec.Creds.Secret.Name
secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{})
if err != nil {
Expand Down
124 changes: 82 additions & 42 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ spec:
type: array
items:
type: string
tlsFirst:
description: When true, the KV Store will iniate TLS before server INFO.
type: boolean
default: false
status:
type: object
properties:
Expand Down Expand Up @@ -513,6 +517,11 @@ spec:
spec:
type: object
properties:
durableName:
description: The name of the Consumer.
type: string
pattern: '^[^.*>]+$'
minLength: 1
streamName:
description: The name of the Stream to create the Consumer in.
type: string
Expand All @@ -533,11 +542,6 @@ spec:
optStartTime:
description: Time format must be RFC3339.
type: string
durableName:
description: The name of the Consumer.
type: string
pattern: '^[^.*>]+$'
minLength: 1
deliverSubject:
description: The subject to deliver observed messages, when not set, a pull-based Consumer is created.
type: string
Expand Down Expand Up @@ -626,6 +630,32 @@ spec:
type: object
additionalProperties:
type: string
account:
description: Name of the account to which the Consumer belongs.
type: string
pattern: '^[^.*>]*$'
creds:
description: NATS user credentials for connecting to servers. Please make sure your controller has mounted the cerds on its path.
type: string
default: ''
nkey:
description: NATS user NKey for connecting to servers.
type: string
default: ''
preventDelete:
description: When true, the managed Consumer will not be deleted when the resource is deleted
type: boolean
default: false
preventUpdate:
description: When true, the managed Consumer will not be updated when the resource is updated
type: boolean
default: false
servers:
description: A list of servers for creating consumer
type: array
items:
type: string
default: []
tls:
description: A client's TLS certs and keys.
type: object
Expand All @@ -641,30 +671,8 @@ spec:
type: array
items:
type: string
servers:
description: A list of servers for creating consumer
type: array
items:
type: string
default: []
creds:
description: NATS user credentials for connecting to servers. Please make sure your controller has mounted the cerds on its path.
type: string
default: ''
nkey:
description: NATS user NKey for connecting to servers.
type: string
default: ''
account:
description: Name of the account to which the Consumer belongs.
type: string
pattern: '^[^.*>]*$'
preventDelete:
description: When true, the managed Consumer will not be deleted when the resource is deleted
type: boolean
default: false
preventUpdate:
description: When true, the managed Consumer will not be updated when the resource is updated
tlsFirst:
description: When true, the KV Store will iniate TLS before server INFO.
type: boolean
default: false
status:
Expand Down Expand Up @@ -991,6 +999,19 @@ spec:
type: string
pattern: '^[^.*>]*$'
minLength: 1
creds:
description: The creds to be used to connect to the NATS Service.
type: object
properties:
secret:
type: object
properties:
name:
description: Name of the secret with the creds.
type: string
file:
description: Credentials file, generated with github.com/nats-io/nsc tool.
type: string
servers:
description: A list of servers to connect.
type: array
Expand All @@ -1017,19 +1038,30 @@ spec:
key:
description: Filename of the TLS cert key.
type: string
creds:
description: The creds to be used to connect to the NATS Service.
type: object
properties:
secret:
type: object
properties:
name:
description: Name of the secret with the creds.
type: string
file:
description: Credentials file, generated with github.com/nats-io/nsc tool.
type: string
tlsFirst:
description: When true, the KV Store will iniate TLS before server INFO.
type: boolean
default: false
status:
type: object
properties:
observedGeneration:
type: integer
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
lastTransitionTime:
type: string
reason:
type: string
message:
type: string
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
Expand Down Expand Up @@ -1212,6 +1244,10 @@ spec:
type: array
items:
type: string
tlsFirst:
description: When true, the KV Store will iniate TLS before server INFO.
type: boolean
default: false
status:
type: object
properties:
Expand Down Expand Up @@ -1348,6 +1384,10 @@ spec:
type: array
items:
type: string
tlsFirst:
description: When true, the KV Store will iniate TLS before server INFO.
type: boolean
default: false
status:
type: object
properties:
Expand Down
Loading

0 comments on commit 4e12db4

Please sign in to comment.