diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index ee0b28f8..c4a32c80 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -158,7 +158,17 @@ private void validateSplunkConfigurations(final Map configs) thr } private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException { - Header[] headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))}; + Header[] headers; + if (connectorConfig.ack) { + headers = new Header[]{ + new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)), + new BasicHeader("X-Splunk-Request-Channel", java.util.UUID.randomUUID().toString()) + }; + } else { + headers = new Header[]{ + new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)), + }; + } String endpoint = "/services/collector"; String url = connectorConfig.splunkURI + endpoint; final HttpPost httpPost = new HttpPost(url); diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java index 58308348..07c302d5 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java @@ -131,6 +131,44 @@ public void testInvalidKerberosOnlyKeytabSet() { assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set"); } + @Test + public void testInvalidJsonEventEnrichmentConfig1() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("tasks_max", "3"); + configs.put("splunk.hec.json.event.enrichment", "k1=v1 k2=v2"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + + @Test + public void testInvalidJsonEventEnrichmentConfig2() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.hec.json.event.enrichment", "testing-testing non KV"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + + @Test + public void testInvalidJsonEventEnrichmentConfig3() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("tasks_max", "3"); + configs.put("splunk.hec.json.event.enrichment", "k1=v1 k2=v2"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + @Test public void testInvalidToken() { final Map configs = new HashMap<>(); @@ -144,6 +182,18 @@ public void testInvalidToken() { Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); } + @Test + public void testNullHecToken() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.hec.token", null); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(java.lang.NullPointerException.class, ()->connector.validate(configs)); + } + @Test public void testInvalidIndex() { final Map configs = new HashMap<>(); diff --git a/test/lib/connect_params.py b/test/lib/connect_params.py index 1833292b..3878b4ce 100644 --- a/test/lib/connect_params.py +++ b/test/lib/connect_params.py @@ -83,10 +83,6 @@ "tasks_max": "3", "splunk_hec_raw": False, "splunk_hec_json_event_enrichment": "chars=test_tasks_max_3_hec_raw_false"}, - {"name": "test_tasks_max_null", - "tasks_max": "null", - "splunk_hec_raw": False, - "splunk_hec_json_event_enrichment": "chars=test_tasks_max_null"}, {"name": "test_1_source_hec_raw_true", "splunk_hec_raw": True, "splunk_sources": "test_1_source_hec_raw_true"}, @@ -179,12 +175,6 @@ "splunk_hec_raw": True, "splunk_hec_json_event_formatted": "false", "splunk_sourcetypes": "test_splunk_hec_json_event_formatted_false_raw_data"}, - {"name": "test_empty_hec_token", - "splunk_hec_token": None, - "splunk_hec_json_event_enrichment": "chars=test_empty_hec_token"}, - {"name": "test_incorrect_hec_token", - "splunk_hec_token": "dummy-tocken", - "splunk_hec_json_event_enrichment": "chars=test_incorrect_hec_token"}, {"name": "test_splunk_hec_empty_event", "topics": "test_splunk_hec_malformed_events", "splunk_hec_raw": False, @@ -215,4 +205,4 @@ "enable_timestamp_extraction" : "true", "timestamp_regex": r"\\\"time\\\":\\s*\\\"(?