Skip to content

Commit

Permalink
Avoid excess disk writes to cache directory. README tweaks.
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelattwood committed Feb 18, 2025
1 parent fc8ad13 commit 7a196bb
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 22 deletions.
22 changes: 9 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

## JetStream Controller

The JetStream controllers allows you to manage [NATS JetStream](https://github.com/nats-io/jetstream) [Streams](https://docs.nats.io/nats-concepts/jetstream/streams), [Consumers](https://docs.nats.io/nats-concepts/jetstream/consumers), [Key/Value Stores](https://docs.nats.io/nats-concepts/jetstream/key-value-store), and [Object Stores](https://docs.nats.io/nats-concepts/jetstream/obj_store) via Kubernetes CRDs.
The JetStream controllers allows you to manage [NATS JetStream](https://docs.nats.io/nats-concepts/jetstream) [Streams](https://docs.nats.io/nats-concepts/jetstream/streams), [Consumers](https://docs.nats.io/nats-concepts/jetstream/consumers), [Key/Value Stores](https://docs.nats.io/nats-concepts/jetstream/key-value-store), and [Object Stores](https://docs.nats.io/nats-concepts/jetstream/obj_store) via Kubernetes CRDs.

Resources managed by NACK controllers are expected to _exclusively_ be managed by NACK, and configuration state will be enforced if mutated by an external client.

Expand All @@ -21,20 +21,18 @@ Install with Helm:

```
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm upgrade --install nats nats/nats --set config.jetstream.enabled=true
helm upgrade --install nats nats/nats --set config.jetstream.enabled=true --set config.cluster.enabled=true
helm upgrade --install nack nats/nack --set jetstream.nats.url=nats://nats.default.svc.cluster.local:4222
```

#### (Optional) Enable Experimental `controller-runtime` Controllers

> **Note**: If migrating an existing install to the `controller-runtime` architecture, it is advisable to first enable with the `-read-only` flag.
> **Note**: The updated controllers will more reliably enforce resource state. If migrating from an older version of NACK, as long as all NATS resources are in-sync with NACK resources no modifications are expected.
>
> The `jetstream-controller` logs will preview any changes that would be made to existing resources.
>
> The updated architecture will more reliably enforce state. If all resources are in-sync with NACK, no changes are expected.
> The `jetstream-controller` logs will contain a diff of any changes the controller has made.
```
helm upgrade -n nack nack nats/nack --set jetstream.additionalArgs={-control-loop=true}
helm upgrade nack nats/nack --set jetstream.additionalArgs={--control-loop=true}
```

#### Creating Streams and Consumers
Expand Down Expand Up @@ -107,7 +105,7 @@ $ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/ex
# Check if it was successfully created.
$ kubectl get streams
NAME STATE STREAM NAME SUBJECTS
mystream Created mystream [orders.*]
mystream Ready mystream [orders.*]

# Create a push-based consumer
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/examples/consumer_push.yml
Expand All @@ -118,8 +116,8 @@ $ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/ex
# Check if they were successfully created.
$ kubectl get consumers
NAME STATE STREAM CONSUMER ACK POLICY
my-pull-consumer Created mystream my-pull-consumer explicit
my-push-consumer Created mystream my-push-consumer none
my-pull-consumer Ready mystream my-pull-consumer explicit
my-push-consumer Ready mystream my-push-consumer none

# If you end up in an Errored state, run kubectl describe for more info.
# kubectl describe streams mystream
Expand Down Expand Up @@ -384,7 +382,7 @@ For more information see the
```
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm upgrade --install --create-namespace --namespace nats nats nats/nats
helm upgrade --install nats nats/nats
```
### Configuring
Expand All @@ -410,5 +408,3 @@ Build Docker image
```sh
make nats-boot-config-docker ver=1.2.3
```
## API Reference
7 changes: 6 additions & 1 deletion cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"

Expand Down Expand Up @@ -73,7 +74,7 @@ func run() error {
readOnly := flag.Bool("read-only", false, "Starts the controller without causing changes to the NATS resources")
cacheDir := flag.String("cache-dir", "", "Directory to store cached credential and TLS files")
controlLoop := flag.Bool("control-loop", false, "Experimental: Run controller with a full reconciliation control loop.")
controlLoopSyncInterval := flag.Duration("sync-interval", 5*time.Minute, "Interval to perform scheduled reconcile")
controlLoopSyncInterval := flag.Duration("sync-interval", time.Minute, "Interval to perform scheduled reconcile")

flag.Parse()

Expand Down Expand Up @@ -183,6 +184,10 @@ func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, control
return fmt.Errorf("create cache dir: %w", err)
}
defer os.RemoveAll(cacheDir)
cacheDir, err = filepath.Abs(cacheDir)
if err != nil {
return fmt.Errorf("get absolute cache dir: %w", err)
}
controllerCfg.CacheDir = cacheDir
} else {
if _, err := os.Stat(controllerCfg.CacheDir); os.IsNotExist(err) {
Expand Down
4 changes: 2 additions & 2 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,7 @@ spec:
- name: KV Store Name
type: string
description: The name of the KV Store.
jsonPath: .spec.name
jsonPath: .spec.bucket
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
Expand Down Expand Up @@ -1450,4 +1450,4 @@ spec:
- name: Object Store Name
type: string
description: The name of the Object Store.
jsonPath: .spec.name
jsonPath: .spec.bucket
62 changes: 56 additions & 6 deletions internal/controller/jetstream_controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -10,6 +11,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -67,6 +69,7 @@ type jsController struct {
config *NatsConfig
controllerConfig *Config
cacheDir string
cacheLock sync.Mutex
}

func (c *jsController) RequeueInterval() time.Duration {
Expand Down Expand Up @@ -156,6 +159,9 @@ func (c *jsController) natsConfigFromOpts(opts api.ConnectionOpts, ns string) (*
accountOverlay.ServerURL = strings.Join(account.Spec.Servers, ",")
}

c.cacheLock.Lock()
defer c.cacheLock.Unlock()

if account.Spec.TLS != nil && account.Spec.TLS.Secret != nil {
tlsSecret := &v1.Secret{}
err := c.Get(ctx,
Expand Down Expand Up @@ -188,6 +194,15 @@ func (c *jsController) natsConfigFromOpts(opts api.ConnectionOpts, ns string) (*
case account.Spec.TLS.RootCAs:
rootCAPath := filepath.Join(accDir, k)
accountOverlay.CAs = append(accountOverlay.CAs, rootCAPath)

if _, err := os.Stat(rootCAPath); err == nil {
caBytes, err := os.ReadFile(rootCAPath)
// Skip file write if data is unchanged
if err == nil && bytes.Equal(caBytes, v) {
continue
}
}

if err := os.WriteFile(rootCAPath, v, 0o644); err != nil {
return nil, err
}
Expand All @@ -198,11 +213,34 @@ func (c *jsController) natsConfigFromOpts(opts api.ConnectionOpts, ns string) (*
accountOverlay.Certificate = certPath
accountOverlay.Key = keyPath

if err := os.WriteFile(certPath, certData, 0o644); err != nil {
return nil, err
writeCert := true
if _, err := os.Stat(certPath); err == nil {
fileBytes, err := os.ReadFile(certPath)
// Skip disk write if data is unchanged
if err == nil && bytes.Equal(fileBytes, certData) {
writeCert = false
}
}
if err := os.WriteFile(keyPath, keyData, 0o644); err != nil {
return nil, err

if writeCert {
if err := os.WriteFile(certPath, certData, 0o644); err != nil {
return nil, err
}
}

writeKey := true
if _, err := os.Stat(keyPath); err == nil {
fileBytes, err := os.ReadFile(keyPath)
// Skip disk write if data is unchanged
if err == nil && bytes.Equal(fileBytes, keyData) {
writeKey = false
}
}

if writeKey {
if err := os.WriteFile(keyPath, keyData, 0o600); err != nil {
return nil, err
}
}
}
} else if account.Spec.TLS != nil {
Expand Down Expand Up @@ -234,8 +272,20 @@ func (c *jsController) natsConfigFromOpts(opts api.ConnectionOpts, ns string) (*
if credsBytes, ok := credsSecret.Data[account.Spec.Creds.File]; ok {
filePath := filepath.Join(accDir, account.Spec.Creds.File)
accountOverlay.Credentials = filePath
if err := os.WriteFile(filePath, credsBytes, 0o600); err != nil {
return nil, err

writeCreds := true
if _, err := os.Stat(filePath); err == nil {
fileBytes, err := os.ReadFile(filePath)
// Skip disk write if data is unchanged
if err == nil && bytes.Equal(fileBytes, credsBytes) {
writeCreds = false
}
}

if writeCreds {
if err := os.WriteFile(filePath, credsBytes, 0o600); err != nil {
return nil, err
}
}
}
} else if account.Spec.Creds != nil {
Expand Down

0 comments on commit 7a196bb

Please sign in to comment.