Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

* Refactor to classes #79

Open
wants to merge 8 commits into
base: testNewConfigsLogstash
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 34 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ Perform configuration before sending events from Logstash to Azure Data Explorer

```ruby
output {
kusto {
path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "https://ingest-<cluster-name>.kusto.windows.net/"
app_id => "<application id>"
app_key => "<application key/secret>"
app_tenant => "<tenant id>"
database => "<database name>"
table => "<target table>"
json_mapping => "<mapping name>"
proxy_host => "<proxy host>"
proxy_port => <proxy port>
proxy_protocol => <"http"|"https">
}
kusto {
ingest_url => "https://ingest-<cluster-name>.kusto.windows.net/"
app_id => "<application id>"
app_key => "<application key/secret>"
app_tenant => "<tenant id>"
database => "<database name>"
table => "<target table>"
json_mapping => "<mapping name>"
proxy_host => "<proxy host>"
proxy_port => <proxy port>
proxy_protocol => <"http"|"https">
max_size => 10
max_interval => 10
latch_timeout => 60
}
}
```
More information about configuring Logstash can be found in the [logstash configuration guide](https://www.elastic.co/guide/en/logstash/current/configuration.html)
Expand All @@ -56,22 +58,22 @@ More information about configuring Logstash can be found in the [logstash config

| Parameter Name | Description | Notes |
| --- | --- | --- |
| **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required
| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required|
| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional|
| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional|
| **database**| Database name to place events | Required |
| **table** | Target table name to place events | Required
| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal. | Required |
| **app_id, app_key, app_tenant** | Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional |
| **managed_identity** | Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional |
| **database** | Database name to place events | Required |
| **table** | Target table name to place events | Required |
| **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional |
| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | |
| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| |
| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| |
| **proxy_host** | The proxy hostname for redirecting traffic to Kusto.| |
| **proxy_port** | The proxy port for the proxy. Defaults to 80.| |
| **proxy_protocol** | The proxy server protocol , is one of http or https.| |
| **proxy_host** | The proxy hostname for redirecting traffic to Kusto. | Optional |
| **proxy_port** | The proxy port for the proxy. Defaults to 80. | Optional |
| **proxy_protocol** | The proxy server protocol, is one of http or https. | Optional |
| **max_size** | Maximum size of the buffer before it gets flushed, defaults to 10MB. | Optional |
| **latch_timeout** | Latch timeout in seconds, defaults to 60. This is the maximum wait time after which the flushing attempt is timed out and the network is considered to be down. The system waits for the network to be back to retry flushing the same batch. | Optional |

> Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options)

> Note: **path** config parameter is no longer used in the latest release (3.0.0) and will be deprecated in future releases

```bash
export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.proxyHost=1.2.3.4 -Dhttps.proxyPort=8989"
```
Expand All @@ -81,12 +83,13 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox

| Version | Release Date | Notes |
| --- | --- | --- |
| 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library |
| 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK |
| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution |
| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped |
| 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries |
| 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.|
| 3.0.0 | 2024-11-01 | Updated configuration options |
| 2.0.8 | 2024-10-23 | Fix library deprecations, fix issues in the Azure Identity library |
| 2.0.7 | 2024-01-01 | Update Kusto JAVA SDK |
| 2.0.3 | 2023-12-12 | Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution |
| 2.0.2 | 2023-11-28 | Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped |
| 2.0.0 | 2023-09-19 | Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries |
| 1.0.6 | 2022-11-29 | Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.|


## Development Requirements
Expand Down
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ repositories {
// update dependencies to bom azure-sdk-bom/1.2.24

dependencies {
implementation 'com.microsoft.azure.kusto:kusto-data:5.2.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.2.0'
implementation 'com.microsoft.azure.kusto:kusto-data:6.0.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:6.0.0'
implementation 'com.azure:azure-core-http-netty:1.15.1'
implementation 'com.azure:azure-core:1.49.1'
implementation 'com.azure:azure-data-tables:12.4.2'
Expand All @@ -52,7 +52,7 @@ dependencies {
implementation 'com.nimbusds:nimbus-jose-jwt:9.40'
implementation 'com.nimbusds:oauth2-oidc-sdk:11.13'
implementation 'com.univocity:univocity-parsers:2.9.1'
implementation 'commons-codec:commons-codec:1.16.1'
implementation 'commons-codec:commons-codec:1.17.1'
implementation 'commons-logging:commons-logging:1.3.1'
implementation 'io.github.resilience4j:resilience4j-core:1.7.1'
implementation 'io.github.resilience4j:resilience4j-retry:1.7.1'
Expand Down
32 changes: 19 additions & 13 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
require 'logstash/errors'

require 'logstash/outputs/kusto/ingestor'
require 'logstash/outputs/kusto/interval'
require 'logstash/outputs/kusto/custom_size_based_buffer'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'

##
# This plugin sends messages to Azure Kusto in batches.
Expand Down Expand Up @@ -69,19 +69,18 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
config :proxy_protocol, validate: :string, required: false , default: 'http'

# Maximum size of the buffer before it gets flushed, defaults to 10MB
config :max_size, validate: :number, default: 10
config :max_size, validate: :number, required: false , default: 10

# Maximum interval (in seconds) before the buffer gets flushed, defaults to 10
config :max_interval, validate: :number, default: 10
config :max_interval, validate: :number, required: false , default: 10

# Latch timeout in seconds, defaults to 60
config :latch_timeout, validate: :number, required: false, default: 60


default :codec, 'json_lines'

def register
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush_buffer(events)
end

@io_mutex = Mutex.new

final_mapping = json_mapping
Expand All @@ -91,13 +90,22 @@ def register
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs)

@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, proxy_host, proxy_port, proxy_protocol, @logger, executor)


kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping)
kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth)
kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false)
@kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger)
@ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, latch_timeout, executor)

# Deprecation warning for path
if @path
@logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.")
end
sleep(30)
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush_buffer(events)
end
end


Expand All @@ -114,7 +122,6 @@ def multi_receive_encoded(events_and_encoded)

def close
@logger.info("Closing Kusto output plugin")

begin
@buffer.shutdown unless @buffer.nil?
@logger.info("Buffer shutdown") unless @buffer.nil?
Expand All @@ -130,7 +137,6 @@ def close
@logger.error("Error stopping ingestor: #{e.message}")
@logger.error(e.backtrace.join("\n"))
end

@logger.info("Kusto output plugin Closed")
end

Expand Down
Loading
Loading