Skip to content

Commit

Permalink
Merge pull request #371 from deep-splunk/record-key-extraction
Browse files Browse the repository at this point in the history
Record key extraction
  • Loading branch information
hvaghani221 authored Dec 26, 2022
2 parents 3f4db26 + b58585d commit afae667
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,4 @@ private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient ht
}
}
}


}
1 change: 1 addition & 0 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ private Event createHecEventFrom(final SinkRecord record) {
trackMetas.put("kafka_timestamp", String.valueOf(record.timestamp()));
trackMetas.put("kafka_topic", record.topic());
trackMetas.put("kafka_partition", String.valueOf(record.kafkaPartition()));
trackMetas.put("kafka_record_key", String.valueOf(record.key()));
if (HOSTNAME != null)
trackMetas.put("kafka_connect_host", HOSTNAME);
event.addFields(trackMetas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ private void putWithSuccess(boolean raw, boolean withMeta) {
Assert.assertEquals(String.valueOf(1), event.getFields().get("kafka_partition"));
Assert.assertEquals(new UnitUtil(0).configProfile.getTopics(), event.getFields().get("kafka_topic"));
Assert.assertEquals(String.valueOf(0), event.getFields().get("kafka_timestamp"));
Assert.assertEquals("test", event.getFields().get("kafka_record_key"));
j++;
}

Expand All @@ -441,7 +442,7 @@ private Collection<SinkRecord> createSinkRecords(int numOfRecords, String value)
private Collection<SinkRecord> createSinkRecords(int numOfRecords, int start, String value) {
List<SinkRecord> records = new ArrayList<>();
for (int i = start; i < start + numOfRecords; i++) {
SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, null, null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE);
SinkRecord rec = new SinkRecord(new UnitUtil(0).configProfile.getTopics(), 1, null, "test", null, value, i, 0L, TimestampType.NO_TIMESTAMP_TYPE);
records.add(rec);
}
return records;
Expand Down
4 changes: 2 additions & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def setup(request):
def pytest_configure():
# Generate message data
topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic",
"test_splunk_hec_malformed_events","epoch_format","date_format"]
"test_splunk_hec_malformed_events","epoch_format","date_format","record_key"]

create_kafka_topics(config, topics)
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
Expand Down Expand Up @@ -67,9 +67,9 @@ def pytest_configure():
('splunk.header.source', b'kafka_custom_header_source'),
('splunk.header.sourcetype', b'kafka_custom_header_sourcetype')]
producer.send(config["kafka_header_topic"], msg, headers=headers_to_send)

producer.send("test_splunk_hec_malformed_events", {})
producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]})
producer.send("record_key",{"timestamp": config['timestamp']},b"{}")
protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many')
timestamp_producer.send("date_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
timestamp_producer.send("epoch_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"1555209605000\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
Expand Down
9 changes: 7 additions & 2 deletions test/lib/connect_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,10 @@
"splunk_hec_raw": False,
"enable_timestamp_extraction" : "true",
"timestamp_regex": r"\\\"time\\\":\\s*\\\"(?<time>.*?)\"",
"timestamp_format": "epoch"}
]
"timestamp_format": "epoch"},
{"name": "test_extracted_record_key",
"splunk_sourcetypes": "track_record_key",
"topics": "record_key",
"splunk_hec_raw": False,
"splunk_hec_track_data": "true"}
]
3 changes: 2 additions & 1 deletion test/lib/connector.template
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"value.converter.schemas.enable": "{{value_converter_schemas_enable}}",
"enable.timestamp.extraction": "{{enable_timestamp_extraction}}",
"timestamp.regex": "{{timestamp_regex}}",
"timestamp.format": "{{timestamp_format}}"
"timestamp.format": "{{timestamp_format}}",
"splunk.hec.track.data": "{{splunk_hec_track_data}}"
}
}
3 changes: 2 additions & 1 deletion test/lib/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def generate_connector_content(input_disc=None):
"value_converter_schemas_enable": "false",
"enable_timestamp_extraction": "false",
"regex": "",
"timestamp_format": ""
"timestamp_format": "",
"splunk_hec_track_data": "false"
}

if input_disc:
Expand Down
19 changes: 19 additions & 0 deletions test/testcases/test_data_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,22 @@ def test_line_breaking_configuration(self, setup, test_case, test_input, expecte
setup["timestamp"], setup["timestamp"], setup["timestamp"])
assert actual_raw_data == expected_data, \
f'\nActual value: \n{actual_raw_data} \ndoes not match expected value: \n{expected_data}'

@pytest.mark.parametrize("test_scenario, test_input, expected", [
("record_key_extraction", "sourcetype::track_record_key", "{}"),
])
def test_record_key_data_enrichment(self, setup, test_scenario, test_input, expected):
logger.info(f"testing {test_scenario} input={test_input} expected={expected} event(s)")
search_query = f"index={setup['splunk_index']} | search {test_input} | fields *"
logger.info(search_query)
events = check_events_from_splunk(start_time="-15m@m",
url=setup["splunkd_url"],
user=setup["splunk_user"],
query=[f"search {search_query}"],
password=setup["splunk_password"])
logger.info("Splunk received %s events in the last hour", len(events))

if(len(events)==1):
assert events[0]["kafka_record_key"] == expected
else:
assert False,"No event found or duplicate events found"

0 comments on commit afae667

Please sign in to comment.