diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index c4a32c80..b80c542d 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -216,6 +216,4 @@ private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient ht } } } - - } \ No newline at end of file diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index af1494ee..9179a9f1 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -429,6 +429,7 @@ private Event createHecEventFrom(final SinkRecord record) { trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp())); trackMetas.put("kafka_topic", record.topic()); trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition())); + trackMetas.put("kafka_record_key", String.valueOf(record.key())); if (HOSTNAME != null) trackMetas.put("kafka_connect_host", HOSTNAME); event.addFields(trackMetas); diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java index 2a8435bc..f5ae931e 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkTaskTest.java @@ -415,6 +415,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) { Assert.assertEquals(String.valueOf(1), event.getFields().get("kafka_partition")); Assert.assertEquals(new UnitUtil(0).configProfile.getTopics(), event.getFields().get("kafka_topic")); Assert.assertEquals(String.valueOf(0), event.getFields().get("kafka_timestamp")); + Assert.assertEquals("test", event.getFields().get("kafka_record_key")); j++; } @@ -441,7 +442,7 @@ private Collection createSinkRecords(int numOfRecords, String value) private Collection createSinkRecords(int numOfRecords, int start, String value) { List records = new ArrayList<>(); for (int i = start; i < start + numOfRecords; i++) { - SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, null, null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE); + SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, "test", null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE); records.add(rec); } return records; diff --git a/test/conftest.py b/test/conftest.py index 0db4956d..45dd400d 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -39,7 +39,7 @@ def setup(request): def pytest_configure(): # Generate message data topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic", - "test_splunk_hec_malformed_events","epoch_format","date_format"] + "test_splunk_hec_malformed_events","epoch_format","date_format","record_key"] create_kafka_topics(config, topics) producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], @@ -67,9 +67,9 @@ def pytest_configure(): ('splunk.header.source', b'kafka_custom_header_source'), ('splunk.header.sourcetype', b'kafka_custom_header_sourcetype')] producer.send(config["kafka_header_topic"], msg, headers=headers_to_send) - producer.send("test_splunk_hec_malformed_events", {}) producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]}) + producer.send("record_key",{"timestamp": config['timestamp']},b"{}") protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many') timestamp_producer.send("date_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}") timestamp_producer.send("epoch_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"1555209605000\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}") diff --git a/test/lib/connect_params.py b/test/lib/connect_params.py index 3878b4ce..5cc6e4ef 100644 --- a/test/lib/connect_params.py +++ b/test/lib/connect_params.py @@ -204,5 +204,10 @@ "splunk_hec_raw": False, "enable_timestamp_extraction" : "true", "timestamp_regex": r"\\\"time\\\":\\s*\\\"(?