Skip to content

Commit

Permalink
Merge pull request #191 from grepplabs/pr-183-alt
Browse files Browse the repository at this point in the history
Rework of PR# 183  feat: Add support for deterministic listener ports (based on broker ID)
  • Loading branch information
everesio authored Jan 28, 2025
2 parents e5074c8 + 93770ec commit 2318bd6
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 76 deletions.
1 change: 1 addition & 0 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func initFlags() {
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")

Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ var (
Version = "unknown"
)

type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error)
type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error)

type ListenerConfig struct {
BrokerAddress string
ListenerAddress string
AdvertisedAddress string
}

type DialAddressMapping struct {
SourceAddress string
DestinationAddress string
Expand Down Expand Up @@ -74,6 +75,7 @@ type Config struct {
DefaultListenerIP string
BootstrapServers []ListenerConfig
ExternalServers []ListenerConfig
DeterministicListeners bool
DialAddressMappings []DialAddressMapping
DisableDynamicListeners bool
DynamicAdvertisedListener string
Expand Down
7 changes: 4 additions & 3 deletions proxy/processor_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package proxy
import (
"bytes"
"encoding/hex"
"testing"
"time"

"github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestHandleRequest(t *testing.T) {
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) {
}

func TestHandleResponse(t *testing.T) {
netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" {
switch brokerPort {
case 19092:
Expand Down
25 changes: 17 additions & 8 deletions proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
brokersKeyName = "brokers"
hostKeyName = "host"
portKeyName = "port"
nodeKeyName = "node_id"

coordinatorKeyName = "coordinator"
coordinatorsKeyName = "coordinators"
Expand All @@ -26,7 +27,7 @@ var (

func createMetadataResponseSchemaVersions() []Schema {
metadataBrokerV0 := NewSchema("metadata_broker_v0",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)
Expand All @@ -51,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
)

metadataBrokerV1 := NewSchema("metadata_broker_v1",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeNullableStr},
)

metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
Expand Down Expand Up @@ -248,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {

func createFindCoordinatorResponseSchemaVersions() []Schema {
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)

findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)
Expand Down Expand Up @@ -320,12 +321,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
if !ok {
return errors.New("broker.port not found")
}
nodeId, ok := broker.Get(nodeKeyName).(int32)
if !ok {
return errors.New("broker.node_id not found")
}

if host == "" && port <= 0 {
continue
}

newHost, newPort, err := fn(host, port)
newHost, newPort, err := fn(host, port, nodeId)
if err != nil {
return err
}
Expand All @@ -336,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
}
}
if port != newPort {
err = broker.Replace(portKeyName, int32(newPort))
err = broker.Replace(portKeyName, newPort)
if err != nil {
return err
}
Expand Down Expand Up @@ -383,12 +388,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
if !ok {
return errors.New("coordinator.port not found")
}
nodeId, ok := coordinator.Get(nodeKeyName).(int32)
if !ok {
return errors.New("coordinator.node_id not found")
}

if host == "" && port <= 0 {
return nil
}

newHost, newPort, err := fn(host, port)
newHost, newPort, err := fn(host, port, nodeId)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions proxy/protocol/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package protocol
import (
"encoding/hex"
"fmt"
"github.com/google/uuid"
"reflect"
"strings"
"testing"

"github.com/google/uuid"

"github.com/grepplabs/kafka-proxy/config"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
Expand All @@ -20,7 +21,7 @@ var (
// topic_metadata
0x00, 0x00, 0x00, 0x00}

testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 51 {
return "myhost1", 34001, nil
} else if brokerHost == "google.com" && brokerPort == 273 {
Expand All @@ -31,7 +32,7 @@ var (
return "", 0, errors.New("unexpected data")
}

testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 19092 {
return "myhost1", 34001, nil
} else if brokerHost == "localhost" && brokerPort == 29092 {
Expand Down Expand Up @@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) {
a.Nil(err)
a.Equal(bytes, resp)

modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 51 {
return "azure.microsoft.com", 34001, nil
} else if brokerHost == "google.com" && brokerPort == 273 {
Expand Down
Loading

0 comments on commit 2318bd6

Please sign in to comment.