Skip to content

Commit

Permalink
prepare for rework
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed Jan 25, 2025
1 parent b8646e4 commit 90c697b
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 58 deletions.
1 change: 0 additions & 1 deletion cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func initFlags() {
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")

Expand Down
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ type ListenerConfig struct {
ListenerAddress string
AdvertisedAddress string
}
type IdListenerConfig struct {
BrokerAddress string
Listener net.Listener

type EnhancedListenerConfig struct {
ListenerConfig
BrokerID int32
}

type DialAddressMapping struct {
SourceAddress string
DestinationAddress string
Expand Down Expand Up @@ -78,7 +80,6 @@ type Config struct {
DefaultListenerIP string
BootstrapServers []ListenerConfig
ExternalServers []ListenerConfig
DeterministicListeners bool
DialAddressMappings []DialAddressMapping
DisableDynamicListeners bool
DynamicAdvertisedListener string
Expand Down
12 changes: 6 additions & 6 deletions proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (

func createMetadataResponseSchemaVersions() []Schema {
metadataBrokerV0 := NewSchema("metadata_broker_v0",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)
Expand All @@ -52,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
)

metadataBrokerV1 := NewSchema("metadata_broker_v1",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeNullableStr},
)

metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
Expand Down Expand Up @@ -249,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {

func createFindCoordinatorResponseSchemaVersions() []Schema {
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)

findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)
Expand Down Expand Up @@ -341,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
}
}
if port != newPort {
err = broker.Replace(portKeyName, int32(newPort))
err = broker.Replace(portKeyName, newPort)
if err != nil {
return err
}
Expand Down
78 changes: 31 additions & 47 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/sirupsen/logrus"
)

const UnknownBrokerId = -1

type ListenFunc func(cfg config.ListenerConfig) (l net.Listener, err error)

type Listeners struct {
Expand All @@ -25,13 +27,11 @@ type Listeners struct {

listenFunc ListenFunc

deterministicListeners bool
disableDynamicListeners bool
dynamicSequentialMinPort int

brokerToListenerConfig map[string]config.ListenerConfig
brokerIdToIdListenerConfig map[int32]config.IdListenerConfig
lock sync.RWMutex
brokerToListenerConfig map[string]config.EnhancedListenerConfig
lock sync.RWMutex
}

func NewListeners(cfg *config.Config) (*Listeners, error) {
Expand Down Expand Up @@ -66,24 +66,20 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
return nil, err
}

brokerIdToIdListenerConfig := make(map[int32]config.IdListenerConfig)

return &Listeners{
defaultListenerIP: defaultListenerIP,
dynamicAdvertisedListener: dynamicAdvertisedListener,
connSrc: make(chan Conn, 1),
brokerToListenerConfig: brokerToListenerConfig,
brokerIdToIdListenerConfig: brokerIdToIdListenerConfig,
tcpConnOptions: tcpConnOptions,
listenFunc: listenFunc,
deterministicListeners: cfg.Proxy.DeterministicListeners,
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
defaultListenerIP: defaultListenerIP,
dynamicAdvertisedListener: dynamicAdvertisedListener,
connSrc: make(chan Conn, 1),
brokerToListenerConfig: brokerToListenerConfig,
tcpConnOptions: tcpConnOptions,
listenFunc: listenFunc,
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
}, nil
}

func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerConfig, error) {
brokerToListenerConfig := make(map[string]config.ListenerConfig)
func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.EnhancedListenerConfig, error) {
brokerToListenerConfig := make(map[string]config.EnhancedListenerConfig)

for _, v := range cfg.Proxy.BootstrapServers {
if lc, ok := brokerToListenerConfig[v.BrokerAddress]; ok {
Expand All @@ -93,7 +89,10 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
continue
}
logrus.Infof("Bootstrap server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
brokerToListenerConfig[v.BrokerAddress] = v
brokerToListenerConfig[v.BrokerAddress] = config.EnhancedListenerConfig{
ListenerConfig: v,
BrokerID: UnknownBrokerId,
}
}

externalToListenerConfig := make(map[string]config.ListenerConfig)
Expand All @@ -118,7 +117,10 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo
continue
}
logrus.Infof("External server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress)
brokerToListenerConfig[v.BrokerAddress] = v
brokerToListenerConfig[v.BrokerAddress] = config.EnhancedListenerConfig{
ListenerConfig: v,
BrokerID: UnknownBrokerId,
}
}
return brokerToListenerConfig, nil
}
Expand All @@ -132,26 +134,13 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, br

p.lock.RLock()
listenerConfig, ok := p.brokerToListenerConfig[brokerAddress]
idListenerConfig, brokerIdFound := p.brokerIdToIdListenerConfig[brokerId]
p.lock.RUnlock()

if ok {
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s, brokerId=%d", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress, brokerId)
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
}
if !p.disableDynamicListeners {
if brokerIdFound {
logrus.Infof("Broker ID %d has a new advertised listener, closing existing dynamic listener", brokerId)
// Existing broker ID found, but with a different upstream broker
// Close existing listener, remove two mappings:
// * ID to removed upstream broker
// * removed upstream broker
idListenerConfig.Listener.Close()
p.lock.Lock()
delete(p.brokerIdToIdListenerConfig, brokerId)
delete(p.brokerToListenerConfig, idListenerConfig.BrokerAddress)
p.lock.Unlock()
}
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
return p.ListenDynamicInstance(brokerAddress, brokerId)
}
Expand All @@ -166,15 +155,9 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
return util.SplitHostPort(v.AdvertisedAddress)
}

var defaultListenerAddress string

if p.deterministicListeners {
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort+int(brokerId)))
} else {
defaultListenerAddress = net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
if p.dynamicSequentialMinPort != 0 {
p.dynamicSequentialMinPort += 1
}
defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort))
if p.dynamicSequentialMinPort != 0 {
p.dynamicSequentialMinPort += 1
}

cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress}
Expand All @@ -191,10 +174,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
}

advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port))
p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress}
p.brokerIdToIdListenerConfig[brokerId] = config.IdListenerConfig{BrokerAddress: brokerAddress, Listener: l}

logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress)
p.brokerToListenerConfig[brokerAddress] = config.EnhancedListenerConfig{
ListenerConfig: config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress},
BrokerID: brokerId,
}
logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", address, brokerAddress, brokerId, advertisedAddress)

return dynamicAdvertisedListener, int32(port), nil
}
Expand Down

0 comments on commit 90c697b

Please sign in to comment.