Skip to content

Commit

Permalink
Add Metadata Response (Version: 8)
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed Sep 2, 2019
1 parent 1df00fa commit c227df5
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 3 deletions.
61 changes: 59 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ Use localhost:32400, localhost:32401 and localhost:32402 as bootstrap servers


### Connect to Kafka running in Kubernetes example (kafka proxy runs locally)

kafka.properties of one node Kafka
#### one node Kafka cluster
kafka.properties

```
broker.id=0
Expand All @@ -508,6 +508,63 @@ kafka-proxy server --bootstrap-server-mapping "127.0.0.1:9092,0.0.0.0:19092" --d

Use localhost:19092 as bootstrap servers

#### 3 nodes Kafka cluster

[strimzi 0.13.0 CRD](https://strimzi.io/)

```yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: test-cluster
namespace: kafka
spec:
kafka:
version: 2.3.0
replicas: 3
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
num.partitions: 60
default.replication.factor: 3
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 20Gi
deleteClaim: true
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 5Gi
deleteClaim: true
entityOperator:
topicOperator: {}
userOperator: {}
```
```bash
kubectl port-forward -n kafka test-cluster-kafka-0 9092:9092
kubectl port-forward -n kafka test-cluster-kafka-1 9093:9092
kubectl port-forward -n kafka test-cluster-kafka-2 9094:9092

kafka-proxy server --log-level debug \
--bootstrap-server-mapping "127.0.0.1:9092,0.0.0.0:19092" \
--bootstrap-server-mapping "127.0.0.1:9093,0.0.0.0:19093" \
--bootstrap-server-mapping "127.0.0.1:9094,0.0.0.0:19094" \
--dial-address-mapping "test-cluster-kafka-0.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9092" \
--dial-address-mapping "test-cluster-kafka-1.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9093" \
--dial-address-mapping "test-cluster-kafka-2.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9094"
```

Use localhost:19092 as bootstrap servers

### Embedded third-party source code

* [Cloud SQL Proxy](https://github.com/GoogleCloudPlatform/cloudsql-proxy)
Expand Down
19 changes: 18 additions & 1 deletion proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func createMetadataResponseSchemaVersions() []Schema {
&array{name: "partition_metadata", ty: partitionMetadataV7},
)

topicMetadataV8 := NewSchema("topic_metadata_v8",
&field{name: "error_code", ty: typeInt16},
&field{name: "name", ty: typeStr},
&field{name: "is_internal", ty: typeBool},
&array{name: "partition_metadata", ty: partitionMetadataV7},
&field{name: "topic_authorized_operations", ty: typeInt32},
)

metadataResponseV1 := NewSchema("metadata_response_v1",
&array{name: brokersKeyName, ty: metadataBrokerV1},
&field{name: "controller_id", ty: typeInt32},
Expand Down Expand Up @@ -138,7 +146,16 @@ func createMetadataResponseSchemaVersions() []Schema {
&array{name: "topic_metadata", ty: topicMetadataV7},
)

return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7}
metadataResponseV8 := NewSchema("metadata_response_v8",
&field{name: "throttle_time_ms", ty: typeInt32},
&array{name: brokersKeyName, ty: metadataBrokerV1},
&field{name: "cluster_id", ty: typeNullableStr},
&field{name: "controller_id", ty: typeInt32},
&array{name: "topic_metadata", ty: topicMetadataV8},
&field{name: "cluster_authorized_operations", ty: typeInt32},
)

return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8}
}

func createFindCoordinatorResponseSchemaVersions() []Schema {
Expand Down
213 changes: 213 additions & 0 deletions proxy/protocol/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,219 @@ func TestMetadataResponseV7(t *testing.T) {
}
a.Equal(expected, dc.AttrValues())
}

func TestMetadataResponseV8(t *testing.T) {
/*
Metadata Response (Version: 8) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
throttle_time_ms => INT32
brokers => node_id host port rack
node_id => INT32
host => STRING
port => INT32
rack => NULLABLE_STRING
cluster_id => NULLABLE_STRING
controller_id => INT32
topics => error_code name is_internal [partitions] topic_authorized_operations
error_code => INT16
name => STRING
is_internal => BOOLEAN
partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
error_code => INT16
partition_index => INT32
leader_id => INT32
leader_epoch => INT32
replica_nodes => INT32
isr_nodes => INT32
offline_replicas => INT32
topic_authorized_operations => INT32
cluster_authorized_operations => INT32
*/

apiVersion := int16(8)

bytes := []byte{
// throttle_time_ms
0x00, 0x00, 0x00, 0x01, // 1
// brokers
0x00, 0x00, 0x00, 0x03,
// brokers[0]
0x00, 0x00, 0xab, 0xff, // 44031
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
0x00, 0x00, 0x00, 0x33, // 51
0x00, 0x00, // ''
// brokers[1]
0x00, 0x01, 0x02, 0x03, // 66051
0x00, 0x0a, 'g', 'o', 'o', 'g', 'l', 'e', '.', 'c', 'o', 'm',
0x00, 0x00, 0x01, 0x11, // 273
0x00, 0x07, 'e', 'u', 'w', 'e', 's', 't', '1',
// brokers[2]
0x00, 0x00, 0x00, 0x02, // 2
0x00, 0x09, 'k', 'a', 'f', 'k', 'a', '.', 'o', 'r', 'g',
0x00, 0x00, 0xd0, 0xff, // 53503
0xff, 0xff, // -1 is nil'

// cluster_id
0x00, 0x07, 'm', 'y', 'k', 'a', 'f', 'k', 'a',

// controller_id
0x00, 0x00, 0xe1, 0xb2, // 57778

// topic_metadata
0x00, 0x00, 0x00, 0x02,

// topic_metadata[0]
0x00, 0x00,
0x00, 0x03, 'f', 'o', 'o',
0x01, // true
// partition_metadata
0x00, 0x00, 0x00, 0x01,
0x00, 0x04,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x07,
0x00, 0x00, 0x00, 0x08,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x07,
0x00, 0x00, 0x00, 0x08, // topic_authorized_operations 8
// topic_metadata[1]
0x00, 0x00,
0x00, 0x03, 'b', 'a', 'r',
0x00, // false
// partition_metadata
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x04, // topic_authorized_operations 4
// cluster_authorized_operations 5
0x00, 0x00, 0x00, 0x05,
}

a := assert.New(t)

schema := metadataResponseSchemaVersions[apiVersion]

s, err := DecodeSchema(bytes, schema)
a.Nil(err)

dc := NewDecodeCheck()
dc.Traverse(s)

expected := []string{
"throttle_time_ms int32 1",
"[brokers]",
"brokers struct",
"node_id int32 44031",
"host string localhost",
"port int32 51",
"rack *string ",
"brokers struct",
"node_id int32 66051",
"host string google.com",
"port int32 273",
"rack *string euwest1",
"brokers struct",
"node_id int32 2",
"host string kafka.org",
"port int32 53503",
"rack *string <nil>",
"cluster_id *string mykafka",
"controller_id int32 57778",
"[topic_metadata]",
"topic_metadata struct",
"error_code int16 0",
"name string foo",
"is_internal bool true",
"[partition_metadata]",
"partition_metadata struct",
"error_code int16 4",
"partition int32 1",
"leader int32 7",
"leader_epoch int32 8",
"[replicas]",
"replicas int32 1",
"replicas int32 2",
"replicas int32 3",
"[isr]",
"isr int32 3",
"isr int32 2",
"[offline_replicas]",
"offline_replicas int32 5",
"offline_replicas int32 6",
"offline_replicas int32 7",
"topic_authorized_operations int32 8",
"topic_metadata struct",
"error_code int16 0",
"name string bar",
"is_internal bool false",
"[partition_metadata]",
"topic_authorized_operations int32 4",
"cluster_authorized_operations int32 5",
}
a.Equal(expected, dc.AttrValues())
resp, err := EncodeSchema(s, schema)
a.Nil(err)
a.Equal(bytes, resp)

modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, testResponseModifier)
a.Nil(err)
resp, err = modifier.Apply(resp)
a.Nil(err)
s, err = DecodeSchema(resp, schema)
a.Nil(err)
dc = NewDecodeCheck()
dc.Traverse(s)
expected = []string{
"throttle_time_ms int32 1",
"[brokers]",
"brokers struct",
"node_id int32 44031",
"host string myhost1", // replaced
"port int32 34001", // replaced
"rack *string ",
"brokers struct",
"node_id int32 66051",
"host string myhost2", // replaced
"port int32 34002", // replaced
"rack *string euwest1",
"brokers struct",
"node_id int32 2",
"host string myhost3", // replaced
"port int32 34003", // replaced
"rack *string <nil>",
"cluster_id *string mykafka",
"controller_id int32 57778",
"[topic_metadata]",
"topic_metadata struct",
"error_code int16 0",
"name string foo",
"is_internal bool true",
"[partition_metadata]",
"partition_metadata struct",
"error_code int16 4",
"partition int32 1",
"leader int32 7",
"leader_epoch int32 8",
"[replicas]",
"replicas int32 1",
"replicas int32 2",
"replicas int32 3",
"[isr]",
"isr int32 3",
"isr int32 2",
"[offline_replicas]",
"offline_replicas int32 5",
"offline_replicas int32 6",
"offline_replicas int32 7",
"topic_authorized_operations int32 8",
"topic_metadata struct",
"error_code int16 0",
"name string bar",
"is_internal bool false",
"[partition_metadata]",
"topic_authorized_operations int32 4",
"cluster_authorized_operations int32 5",

}
a.Equal(expected, dc.AttrValues())
}
func TestFindCoordinatorResponseV0(t *testing.T) {
/*
FindCoordinator Response (Version: 0) => error_code coordinator
Expand Down

0 comments on commit c227df5

Please sign in to comment.