Skip to content

Commit

Permalink
integrate cert-source lib, add listener CLR file and cert rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed Feb 4, 2025
1 parent adfee6a commit c91f23f
Show file tree
Hide file tree
Showing 21 changed files with 1,373 additions and 79 deletions.
4 changes: 3 additions & 1 deletion cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ func initFlags() {
Server.Flags().DurationVar(&c.Proxy.ListenerKeepAlive, "proxy-listener-keep-alive", 60*time.Second, "Keep alive period for an active network connection. If zero, keep-alives are disabled")

Server.Flags().BoolVar(&c.Proxy.TLS.Enable, "proxy-listener-tls-enable", false, "Whether or not to use TLS listener")
Server.Flags().DurationVar(&c.Proxy.TLS.Refresh, "proxy-listener-tls-refresh", 0*time.Second, "Interval for refreshing server TLS certificates. If set to zero, the refresh watch is disabled")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCertFile, "proxy-listener-cert-file", "", "PEM encoded file with server certificate")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyFile, "proxy-listener-key-file", "", "PEM encoded file with private key for the server certificate")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerKeyPassword, "proxy-listener-key-password", os.Getenv("PROXY_LISTENER_KEY_PASSWORD"), "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Proxy.TLS.CAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCAChainCertFile, "proxy-listener-ca-chain-cert-file", "", "PEM encoded CA's certificate file. If provided, client certificate is required and verified")
Server.Flags().StringVar(&c.Proxy.TLS.ListenerCRLFile, "proxy-listener-crl-file", "", "PEM encoded X509 CRLs file")
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCipherSuites, "proxy-listener-cipher-suites", []string{}, "List of supported cipher suites")
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCurvePreferences, "proxy-listener-curve-preferences", []string{}, "List of curve preferences")

Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ type Config struct {

TLS struct {
Enable bool
Refresh time.Duration
ListenerCertFile string
ListenerKeyFile string
ListenerKeyPassword string
CAChainCertFile string
ListenerCAChainCertFile string
ListenerCRLFile string
ListenerCipherSuites []string
ListenerCurvePreferences []string
ClientCert struct {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/fsnotify/fsnotify v1.4.9
github.com/go-ldap/ldap/v3 v3.2.3
github.com/google/uuid v1.6.0
github.com/grepplabs/cert-source v0.0.6
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-multierror v0.0.0-20171204182908-b7773ae21874
github.com/hashicorp/go-plugin v1.6.3
Expand All @@ -26,7 +27,7 @@ require (
github.com/spf13/viper v1.0.2
github.com/stretchr/testify v1.10.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
golang.org/x/net v0.34.0
golang.org/x/oauth2 v0.24.0
google.golang.org/api v0.126.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ github.com/googleapis/gax-go/v2 v2.11.0 h1:9V9PWXEsWnPpQhu/PeQIkS4eGzMlTLGgt80cU
github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/grepplabs/cert-source v0.0.6 h1:FjrFco5wQrMqGI4wzCvkIksK0xOoyIC6FV2Cg53thHg=
github.com/grepplabs/cert-source v0.0.6/go.mod h1:gs3IoykME1cFfZ6/h6hch8yg8ktUInsR9OY2xSHA2r4=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce h1:prjrVgOk2Yg6w+PflHoszQNLTUh4kaByUcEWM/9uin4=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -281,8 +283,8 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 h1:tBiBTKHnIjovYoLX/TPkcf+OjqqKGQrPtGT3Foz+Pgo=
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76/go.mod h1:SQliXeA7Dhkt//vS29v3zpbEwoa+zb2Cn5xj5uO4K5U=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
Expand Down
65 changes: 21 additions & 44 deletions proxy/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"log/slog"
"net"
"os"
"reflect"
"strings"
"time"

tlsconfig "github.com/grepplabs/cert-source/config"
tlsserver "github.com/grepplabs/cert-source/tls/server"
tlsserverconfig "github.com/grepplabs/cert-source/tls/server/config"
"github.com/grepplabs/kafka-proxy/config"
"github.com/pkg/errors"
"github.com/youmark/pkcs8"
Expand Down Expand Up @@ -59,25 +63,6 @@ var (
func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) {
opts := conf.Proxy.TLS

if opts.ListenerKeyFile == "" || opts.ListenerCertFile == "" {
return nil, errors.New("Listener key and cert files must not be empty")
}
certPEMBlock, err := os.ReadFile(opts.ListenerCertFile)
if err != nil {
return nil, err
}
keyPEMBlock, err := os.ReadFile(opts.ListenerKeyFile)
if err != nil {
return nil, err
}
keyPEMBlock, err = decryptPEM(keyPEMBlock, opts.ListenerKeyPassword)
if err != nil {
return nil, err
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
return nil, err
}
cipherSuites, err := getCipherSuites(opts.ListenerCipherSuites)
if err != nil {
return nil, err
Expand All @@ -86,35 +71,27 @@ func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) {
if err != nil {
return nil, err
}

cfg := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert,
PreferServerCipherSuites: true,
MinVersion: tls.VersionTLS12,
CurvePreferences: curvePreferences,
CipherSuites: cipherSuites,
}
if opts.CAChainCertFile != "" {
caCertPEMBlock, err := os.ReadFile(opts.CAChainCertFile)
if err != nil {
return nil, err
}
clientCAs := x509.NewCertPool()
if ok := clientCAs.AppendCertsFromPEM(caCertPEMBlock); !ok {
return nil, errors.New("Failed to parse listener root certificate")
}
cfg.ClientCAs = clientCAs
cfg.ClientAuth = tls.RequireAndVerifyClientCert
}

tlsValidateFunc, err := tlsClientCertVerificationFunc(conf)
if err != nil {
return nil, err
}
cfg.VerifyPeerCertificate = tlsValidateFunc

return cfg, nil
tlsConfig, err := tlsserverconfig.GetServerTLSConfig(slog.Default(),
&tlsconfig.TLSServerConfig{
Enable: true,
Refresh: opts.Refresh,
KeyPassword: opts.ListenerKeyPassword,
File: tlsconfig.TLSServerFiles{
Key: opts.ListenerKeyFile,
Cert: opts.ListenerCertFile,
ClientCAs: opts.ListenerCAChainCertFile,
ClientCRL: opts.ListenerCRLFile,
},
},
tlsserver.WithTLSServerVerifyPeerCertificate(tlsValidateFunc),
tlsserver.WithTLSServerCipherSuites(cipherSuites),
tlsserver.WithTLSServerCurvePreferences(curvePreferences),
)
return tlsConfig, nil
}

func getCipherSuites(enabledCipherSuites []string) ([]uint16, error) {
Expand Down
18 changes: 9 additions & 9 deletions proxy/tls_client_cert_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestValidEnabledClientCertSubjectValidate(t *testing.T) {

c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestInvalidEnabledClientCertSubjectValidate(t *testing.T) {
}
c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestValidEnabledClientCertSubjectMayContainNotRequiredValues(t *testing.T)
}
c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestValidEnabledClientCertSubjectMayContainValuesInDifferentOrder(t *testin
}
c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestClientCertMultipleSubjects(t *testing.T) {

c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestClientCertMultipleSubjectsPatterns(t *testing.T) {

c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestClientCertMultiplePatternMatchingFields(t *testing.T) {

c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestClientCertMultiplePatternNotMatchingFields(t *testing.T) {

c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestClientCertMultiplePatternMatchingFieldsOrderDoesNotMatter(t *testing.T)

c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle.CACert.Name()
c.Proxy.TLS.ListenerCAChainCertFile = bundle.CACert.Name()

c.Kafka.TLS.CAChainCertFile = bundle.CACert.Name()
c.Kafka.TLS.ClientCertFile = bundle.ClientCert.Name()
Expand Down
10 changes: 5 additions & 5 deletions proxy/tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func TestTLSVerifyClientCertDifferentCAs(t *testing.T) {
c := new(config.Config)
c.Proxy.TLS.ListenerCertFile = bundle1.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle1.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle2.CACert.Name() // client CA
c.Proxy.TLS.ListenerCAChainCertFile = bundle2.CACert.Name() // client CA

c.Kafka.TLS.CAChainCertFile = bundle1.ServerCert.Name()
c.Kafka.TLS.ClientCertFile = bundle2.ClientCert.Name()
Expand All @@ -335,7 +335,7 @@ func TestTLSVerifyClientCertSameCAs(t *testing.T) {
c := new(config.Config)
c.Proxy.TLS.ListenerCertFile = bundle1.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle1.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle1.CACert.Name() // client CA
c.Proxy.TLS.ListenerCAChainCertFile = bundle1.CACert.Name() // client CA

c.Kafka.TLS.CAChainCertFile = bundle1.ServerCert.Name()
c.Kafka.TLS.ClientCertFile = bundle1.ClientCert.Name()
Expand All @@ -358,7 +358,7 @@ func TestTLSMissingClientCert(t *testing.T) {
c := new(config.Config)
c.Proxy.TLS.ListenerCertFile = bundle1.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle1.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle1.CACert.Name() // client CA
c.Proxy.TLS.ListenerCAChainCertFile = bundle1.CACert.Name() // client CA

c.Kafka.TLS.CAChainCertFile = bundle1.ServerCert.Name()

Expand All @@ -379,7 +379,7 @@ func TestTLSBadClientCert(t *testing.T) {
c := new(config.Config)
c.Proxy.TLS.ListenerCertFile = bundle1.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle1.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle1.CACert.Name() // client CA
c.Proxy.TLS.ListenerCAChainCertFile = bundle1.CACert.Name() // client CA

c.Kafka.TLS.CAChainCertFile = bundle1.ServerCert.Name()
c.Kafka.TLS.ClientCertFile = bundle2.ClientCert.Name()
Expand Down Expand Up @@ -431,7 +431,7 @@ func configWithCertToCompare(bundle1 *CertsBundle, bundle2 *CertsBundle, sameCer

c.Proxy.TLS.ListenerCertFile = bundle1.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle1.ServerKey.Name()
c.Proxy.TLS.CAChainCertFile = bundle2.CACert.Name() // client CA
c.Proxy.TLS.ListenerCAChainCertFile = bundle2.CACert.Name() // client CA

c.Kafka.TLS.CAChainCertFile = bundle1.ServerCert.Name()
c.Kafka.TLS.ClientCertFile = bundle2.ClientCert.Name()
Expand Down
Loading

0 comments on commit c91f23f

Please sign in to comment.