Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intercept and rewrite ApiVersionsResponse to prevent negotiation of API keys that kafka-proxy doesn't support #194

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion proxy/processor_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ
if err != nil {
return false, nil, err
}


// Reminder: When adding support for new versions of the produce request, also update proxy/protocol/responses.go
// Change 'apiKeyProduceMaxVersion' when adding new version support
case 3, 4, 5, 6, 7, 8, 9, 10, 11:
// CorrelationID + ClientID
if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
Expand Down
153 changes: 152 additions & 1 deletion proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,29 @@ package protocol
import (
"errors"
"fmt"
"math"

"github.com/grepplabs/kafka-proxy/config"
)

const (
apiKeyProduce = 0
apiKeyMetadata = 3
apiKeyFindCoordinator = 10
apiKeyApiVersions = 18

// intercept and update ApiVersions response to prevent requests/responses that can't be parsed by Kafka-Proxy
apiKeyApiVersionsMaxVersion = 4
apiKeyMetadataMaxVersion = 13
apiKeyFindCoordinatorMaxVersion = 6
// produce requests are parsed by proxy/processor_default.go mustReply()
apiKeyProduceMaxVersion = 11

brokersKeyName = "brokers"
hostKeyName = "host"
portKeyName = "port"
nodeKeyName = "node_id"
apiKeysKeyname = "api_keys"

coordinatorKeyName = "coordinator"
coordinatorsKeyName = "coordinators"
Expand All @@ -23,6 +34,7 @@ const (
var (
metadataResponseSchemaVersions = createMetadataResponseSchemaVersions()
findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions()
apiVersionsResponseSchemaVersions = createApiVersionsResponseSchemaVersions()
)

func createMetadataResponseSchemaVersions() []Schema {
Expand Down Expand Up @@ -244,7 +256,34 @@ func createMetadataResponseSchemaVersions() []Schema {
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12}
metadataResponseV13 := NewSchema("metadata_response_v13",
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9},
&Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr},
&Mfield{Name: "controller_id", Ty: TypeInt32},
&CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12},
&Mfield{Name: "error_code", Ty: TypeInt16},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

// Reminder: When adding support for new versions of the metadataResponse, also update proxy/protocol/responses.go
// Change 'apiKeyMetadataMaxVersion' when adding new version support
return []Schema{
metadataResponseV0,
metadataResponseV1,
metadataResponseV2,
metadataResponseV3,
metadataResponseV4,
metadataResponseV5,
metadataResponseV6,
metadataResponseV7,
metadataResponseV8,
metadataResponseV9,
metadataResponseV10,
metadataResponseV11,
metadataResponseV12,
metadataResponseV13,
}
}

func createFindCoordinatorResponseSchemaVersions() []Schema {
Expand Down Expand Up @@ -297,9 +336,119 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
findCoordinatorResponseV5 := findCoordinatorResponseV4
findCoordinatorResponseV6 := findCoordinatorResponseV5

// Reminder: When adding support for new versions of the findCoordinatorResponse, also update proxy/protocol/responses.go
// Change 'apiKeyFindCoordinatorMaxVersion' when adding new version support
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6}
}

func createApiVersionsResponseSchemaVersions() []Schema {
apiVersionKeyV0 := NewSchema("api_versions_key_v0",
&Mfield{Name: "api_key", Ty: TypeInt16},
&Mfield{Name: "min_version", Ty: TypeInt16},
&Mfield{Name: "max_version", Ty: TypeInt16},
)

apiVersionSchemaV3 := NewSchema("api_versions_key_schema3",
&Mfield{Name: "api_key", Ty: TypeInt16},
&Mfield{Name: "min_version", Ty: TypeInt16},
&Mfield{Name: "max_version", Ty: TypeInt16},
&SchemaTaggedFields{"api_versions_tagged_fields"},
)

apiVersionsResponseV0 := NewSchema("api_versions_response_v0",
&Mfield{Name: "error_code", Ty: TypeInt16},
&Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0},
)

// Version 1 adds throttle time to the response.
apiVersionsResponseV1 := NewSchema("api_versions_response_v1",
&Mfield{Name: "error_code", Ty: TypeInt16},
&Array{Name: apiKeysKeyname, Ty: apiVersionKeyV0},
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
)

// Starting in version 2, on quota violation, brokers send out responses before throttling.
apiVersionsResponseV2 := apiVersionsResponseV1

// Version 3 is the first flexible version. Tagged fields are only supported in the body but
// not in the header. The length of the header must not change in order to guarantee the
// backward compatibility.
//
// Starting from Apache Kafka 2.4 (KIP-511), ApiKeys field is populated with the supported
// versions of the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
apiVersionsResponseV3 := NewSchema("api_versions_response_v3",
&Mfield{Name: "error_code", Ty: TypeInt16},
&CompactArray{Name: apiKeysKeyname, Ty: apiVersionSchemaV3},
&Mfield{Name: "throttle_time_ms", Ty: TypeInt32},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)

// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion from being 0.
apiVersionsResponseV4 := apiVersionsResponseV3

// Reminder: When adding support for new versions of the findCoordinatorResponse, also update proxy/protocol/responses.go
// Change 'apiKeyApiVersionsMaxVersion' when adding new version support
return []Schema{
apiVersionsResponseV0,
apiVersionsResponseV1,
apiVersionsResponseV2,
apiVersionsResponseV3,
apiVersionsResponseV4,
}
}

func modifyApiVersionsResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
}
if fn == nil {
return errors.New("net address mapper must not be nil")
}
apiVersionsArray, ok := decodedStruct.Get(apiKeysKeyname).([]interface{})
if !ok {
return errors.New("api versions not found")
}
for _, apiVersionElement := range apiVersionsArray {
apiVersion := apiVersionElement.(*Struct)
apiKey, ok := apiVersion.Get("api_key").(int16)
if !ok {
return errors.New("api_keys.api_key not found")
}
maxVersion, ok := apiVersion.Get("max_version").(int16)
if !ok {
return errors.New("api_keys.max_version not found")
}

limitVersion := int16(math.MaxInt16)
switch apiKey {
case apiKeyProduce:
if maxVersion > apiKeyProduceMaxVersion {
limitVersion = apiKeyProduceMaxVersion
}
case apiKeyMetadata:
if maxVersion > apiKeyMetadataMaxVersion {
limitVersion = apiKeyMetadataMaxVersion
}
case apiKeyFindCoordinator:
if maxVersion > apiKeyFindCoordinatorMaxVersion {
limitVersion = apiKeyFindCoordinatorMaxVersion
}
case apiKeyApiVersions:
if maxVersion > apiKeyApiVersionsMaxVersion {
limitVersion = apiKeyApiVersionsMaxVersion
}
}
if maxVersion > limitVersion {
err := apiVersion.Replace("max_version", limitVersion)
if err != nil {
return err
}
}
}

return nil
}

func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
if decodedStruct == nil {
return errors.New("decoded struct must not be nil")
Expand Down Expand Up @@ -446,6 +595,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse)
case apiKeyFindCoordinator:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse)
case apiKeyApiVersions:
return newResponseModifier(apiKey, apiVersion, addressMappingFunc, apiVersionsResponseSchemaVersions, modifyApiVersionsResponse)
default:
return nil, nil
}
Expand Down