diff --git a/README.md b/README.md index b256a266..c3e2f1e0 100755 --- a/README.md +++ b/README.md @@ -35,19 +35,21 @@ Perform configuration before sending events from Logstash to Azure Data Explorer ```ruby output { - kusto { - path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt" - ingest_url => "https://ingest-.kusto.windows.net/" - app_id => "" - app_key => "" - app_tenant => "" - database => "" - table => "" - json_mapping => "" - proxy_host => "" - proxy_port => - proxy_protocol => <"http"|"https"> - } + kusto { + ingest_url => "https://ingest-.kusto.windows.net/" + app_id => "" + app_key => "" + app_tenant => "" + database => "" + table => "" + json_mapping => "" + proxy_host => "" + proxy_port => + proxy_protocol => <"http"|"https"> + max_size => 10 + max_interval => 10 + latch_timeout => 60 + } } ``` More information about configuring Logstash can be found in the [logstash configuration guide](https://www.elastic.co/guide/en/logstash/current/configuration.html) @@ -56,22 +58,22 @@ More information about configuring Logstash can be found in the [logstash config | Parameter Name | Description | Notes | | --- | --- | --- | -| **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required -| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required| -| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional| -| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional| -| **database**| Database name to place events | Required | -| **table** | Target table name to place events | Required +| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal. | Required | +| **app_id, app_key, app_tenant** | Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional | +| **managed_identity** | Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional | +| **database** | Database name to place events | Required | +| **table** | Target table name to place events | Required | | **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional | -| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | | -| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| | -| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| | -| **proxy_host** | The proxy hostname for redirecting traffic to Kusto.| | -| **proxy_port** | The proxy port for the proxy. Defaults to 80.| | -| **proxy_protocol** | The proxy server protocol , is one of http or https.| | +| **proxy_host** | The proxy hostname for redirecting traffic to Kusto. | Optional | +| **proxy_port** | The proxy port for the proxy. Defaults to 80. | Optional | +| **proxy_protocol** | The proxy server protocol, is one of http or https. | Optional | +| **max_size** | Maximum size of the buffer before it gets flushed, defaults to 10MB. | Optional | +| **latch_timeout** | Latch timeout in seconds, defaults to 60. This is the maximum wait time after which the flushing attempt is timed out and the network is considered to be down. The system waits for the network to be back to retry flushing the same batch. | Optional | > Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options) +> Note: **path** config parameter is no longer used in the latest release (3.0.0) and will be deprecated in future releases + ```bash export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.proxyHost=1.2.3.4 -Dhttps.proxyPort=8989" ``` @@ -81,12 +83,13 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox | Version | Release Date | Notes | | --- | --- | --- | -| 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library | -| 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK | -| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | -| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | -| 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | -| 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| +| 3.0.0 | 2024-11-01 | Updated configuration options | +| 2.0.8 | 2024-10-23 | Fix library deprecations, fix issues in the Azure Identity library | +| 2.0.7 | 2024-01-01 | Update Kusto JAVA SDK | +| 2.0.3 | 2023-12-12 | Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | +| 2.0.2 | 2023-11-28 | Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | +| 2.0.0 | 2023-09-19 | Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | +| 1.0.6 | 2022-11-29 | Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| ## Development Requirements diff --git a/build.gradle b/build.gradle index a56b7a35..e92dcb9c 100644 --- a/build.gradle +++ b/build.gradle @@ -29,8 +29,8 @@ repositories { // update dependencies to bom azure-sdk-bom/1.2.24 dependencies { - implementation 'com.microsoft.azure.kusto:kusto-data:5.2.0' - implementation 'com.microsoft.azure.kusto:kusto-ingest:5.2.0' + implementation 'com.microsoft.azure.kusto:kusto-data:6.0.0' + implementation 'com.microsoft.azure.kusto:kusto-ingest:6.0.0' implementation 'com.azure:azure-core-http-netty:1.15.1' implementation 'com.azure:azure-core:1.49.1' implementation 'com.azure:azure-data-tables:12.4.2' @@ -52,7 +52,7 @@ dependencies { implementation 'com.nimbusds:nimbus-jose-jwt:9.40' implementation 'com.nimbusds:oauth2-oidc-sdk:11.13' implementation 'com.univocity:univocity-parsers:2.9.1' - implementation 'commons-codec:commons-codec:1.16.1' + implementation 'commons-codec:commons-codec:1.17.1' implementation 'commons-logging:commons-logging:1.3.1' implementation 'io.github.resilience4j:resilience4j-core:1.7.1' implementation 'io.github.resilience4j:resilience4j-retry:1.7.1' diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 6fa160b4..a7d2ac9a 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -5,8 +5,8 @@ require 'logstash/errors' require 'logstash/outputs/kusto/ingestor' -require 'logstash/outputs/kusto/interval' require 'logstash/outputs/kusto/custom_size_based_buffer' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' ## # This plugin sends messages to Azure Kusto in batches. @@ -69,19 +69,18 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base config :proxy_protocol, validate: :string, required: false , default: 'http' # Maximum size of the buffer before it gets flushed, defaults to 10MB - config :max_size, validate: :number, default: 10 + config :max_size, validate: :number, required: false , default: 10 # Maximum interval (in seconds) before the buffer gets flushed, defaults to 10 - config :max_interval, validate: :number, default: 10 + config :max_interval, validate: :number, required: false , default: 10 + + # Latch timeout in seconds, defaults to 60 + config :latch_timeout, validate: :number, required: false, default: 60 + default :codec, 'json_lines' def register - # Initialize the custom buffer with size and interval - @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events| - flush_buffer(events) - end - @io_mutex = Mutex.new final_mapping = json_mapping @@ -91,13 +90,22 @@ def register max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs) - - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, proxy_host, proxy_port, proxy_protocol, @logger, executor) - + + kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping) + kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth) + kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false) + @kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger) + @ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, latch_timeout, executor) + # Deprecation warning for path if @path @logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.") end + sleep(30) + # Initialize the custom buffer with size and interval + @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events| + flush_buffer(events) + end end @@ -114,7 +122,6 @@ def multi_receive_encoded(events_and_encoded) def close @logger.info("Closing Kusto output plugin") - begin @buffer.shutdown unless @buffer.nil? @logger.info("Buffer shutdown") unless @buffer.nil? @@ -130,7 +137,6 @@ def close @logger.error("Error stopping ingestor: #{e.message}") @logger.error(e.backtrace.join("\n")) end - @logger.info("Kusto output plugin Closed") end diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 7c680266..7ce13db2 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -2,6 +2,8 @@ require 'thread' require 'fileutils' require 'securerandom' +require 'net/http' +require 'uri' module LogStash module Outputs @@ -27,20 +29,18 @@ def initialize(max_size_mb, max_interval, &flush_callback) load_buffer_from_files @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds") - # Start the timer thread after a delay to ensure initializations are completed - Thread.new do - sleep(10) - @buffer_state[:timer] = Thread.new do - loop do - sleep(@buffer_config[:max_interval]) - buffer_flush(force: true) - end + # Start the timer thread + @buffer_state[:timer] = Thread.new do + loop do + sleep(@buffer_config[:max_interval]) + prepare_flush(force: true) end end end def <<(event) while buffer_full? do + prepare_flush(force: true) # Flush when buffer is full sleep 0.1 end @@ -54,8 +54,8 @@ def shutdown @buffer_config[:logger].info("Shutting down buffer") @shutdown = true @buffer_state[:timer].kill - buffer_flush(final: true) - clear_buffer_files + prepare_flush(final: true) + flush_buffer_files end private @@ -66,78 +66,103 @@ def buffer_full? end end - def buffer_flush(options = {}) + def prepare_flush(options = {}) force = options[:force] || options[:final] final = options[:final] - if final - @flush_mutex.lock - elsif !@flush_mutex.try_lock - return 0 - end - - items_flushed = 0 - - begin - outgoing_items = [] - outgoing_size = 0 - - @pending_mutex.synchronize do - return 0 if @buffer_state[:pending_size] == 0 + outgoing_items = [] + outgoing_size = 0 - time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + @pending_mutex.synchronize do + return 0 if @buffer_state[:pending_size] == 0 + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] - if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] - return 0 - end + if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] + return 0 + end - if force - @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") - elsif @buffer_state[:pending_size] >= @buffer_config[:max_size] - @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") - else - @buffer_config[:logger].info("Flush triggered without specific condition") - end + if time_since_last_flush >= @buffer_config[:max_interval] + @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") + else + @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") + end - outgoing_items = @buffer_state[:pending_items].dup - outgoing_size = @buffer_state[:pending_size] + if @buffer_state[:network_down] + save_buffer_to_file(@buffer_state[:pending_items]) @buffer_state[:pending_items] = [] @buffer_state[:pending_size] = 0 + return 0 end - begin - @flush_callback.call(outgoing_items) # Pass the list of events to the callback - @buffer_state[:network_down] = false # Reset network status after successful flush - flush_buffer_files # Flush buffer files if any exist - rescue => e - @buffer_config[:logger].error("Flush failed: #{e.message}") - @buffer_state[:network_down] = true - save_buffer_to_file(outgoing_items) - end + outgoing_items = @buffer_state[:pending_items].dup + outgoing_size = @buffer_state[:pending_size] + @buffer_state[:pending_items] = [] + @buffer_state[:pending_size] = 0 + end - @buffer_state[:last_flush] = Time.now.to_i - @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes") + if Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).any? + @buffer_config[:logger].info("Flushing all buffer state files") + flush_buffer_files + end - items_flushed = outgoing_items.size + Thread.new { perform_flush(outgoing_items) } + end + + def perform_flush(events, file_path = nil) + + @flush_mutex.lock + + begin + if file_path + unless ::File.exist?(file_path) + return 0 + end + begin + buffer_state = Marshal.load(::File.read(file_path)) + events = buffer_state[:pending_items] + rescue => e + @buffer_config[:logger].error("Failed to load buffer from file: #{e.message}") + return 0 + end + end + @buffer_config[:logger].info("Flushing #{events.size} events, #{events.sum(&:bytesize)} bytes") + @flush_callback.call(events) # Pass the list of events to the callback + @buffer_state[:network_down] = false # Reset network status after successful flush + @buffer_state[:last_flush] = Time.now.to_i + @buffer_config[:logger].info("Flush completed. Flushed #{events.size} events, #{events.sum(&:bytesize)} bytes") + + if file_path + ::File.delete(file_path) + @buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}") + end + + rescue => e + @buffer_config[:logger].error("Flush failed: #{e.message}") + @buffer_state[:network_down] = true + sleep(120) # Wait before checking network availability again + @buffer_config[:logger].info("Retrying flush.") + retry ensure @flush_mutex.unlock end - - items_flushed end + def save_buffer_to_file(events) buffer_state_copy = { pending_items: events, pending_size: events.sum(&:bytesize) } - - ::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists - file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log") - ::File.open(file_path, 'w') do |file| - file.write(Marshal.dump(buffer_state_copy)) + begin + ::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists + file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log") + ::File.open(file_path, 'w') do |file| + file.write(Marshal.dump(buffer_state_copy)) + end + @buffer_config[:logger].info("Saved #{events.size} events to file: #{file_path}") + rescue => e + @buffer_config[:logger].error("Failed to save buffer to file: #{e.message}") end - @buffer_config[:logger].info("Saved buffer state to file: #{file_path}") end def load_buffer_from_files @@ -156,24 +181,9 @@ def load_buffer_from_files def flush_buffer_files Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| - begin - buffer_state = Marshal.load(::File.read(file_path)) - @buffer_config[:logger].info("Flushed from file: #{file_path}") - @flush_callback.call(buffer_state[:pending_items]) - ::File.delete(file_path) - @buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}") - rescue => e - @buffer_config[:logger].error("Failed to flush buffer state file: #{e.message}") - break - end - end - end - - def clear_buffer_files - Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| - ::File.delete(file_path) + @buffer_config[:logger].info("Flushing from buffer state file: #{file_path}") + Thread.new { perform_flush([], file_path) } end - @buffer_config[:logger].info("Cleared all buffer state files") end end end diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index e991d5a5..969653f9 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,38 +20,39 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) + def initialize(kusto_logstash_configuration, logger, latch_timeout = 60, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool + @latch_timeout = latch_timeout @logger = logger - validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth) + #Validate and assign + kusto_logstash_configuration.validate_config() + @kusto_logstash_configuration = kusto_logstash_configuration @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http - # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) - # If there is managed identity, use it. This means the AppId and AppKey are empty/nil - # If there is CLI Auth, use that instead of managed identity - is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth) + + is_managed_identity = @kusto_logstash_configuration.kusto_auth.is_managed_identity # If it is system managed identity, propagate the system identity - is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) + is_system_assigned_managed_identity = @kusto_logstash_configuration.kusto_auth.is_system_assigned_managed_identity # Is it direct connection - is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + is_direct_conn = @kusto_logstash_configuration.kusto_proxy.is_direct_conn # Create a connection string kusto_connection_string = if is_managed_identity if is_system_assigned_managed_identity @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url) else @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_ingest.managed_identity_id) end else - if cli_auth + if @kusto_logstash_configuration.kusto_auth.cli_auth @logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*') - kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(@kusto_logstash_configuration.kusto_ingest.ingest_url) else @logger.info('Using app id and app key.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_auth.app_id, @kusto_logstash_configuration.kusto_auth.app_key.value, @kusto_logstash_configuration.kusto_auth.app_tenant) end end @logger.debug(Gem.loaded_specs.to_s) @@ -62,22 +63,21 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli tuple_utils = Java::org.apache.commons.lang3.tuple # kusto_connection_string.setClientVersionForTracing(name_for_tracing) version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" - kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); + kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,name_for_tracing.to_s,version_for_tracing.to_s,false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin if is_direct_conn kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else - kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() + kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_logstash_configuration.kusto_proxy.proxy_host,@kusto_logstash_configuration.kusto_proxy.proxy_port,@kusto_logstash_configuration.kusto_proxy.proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end - @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) - is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) - if is_mapping_ref_provided - @logger.debug('Using mapping reference.', json_mapping) - @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + @ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table) + if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided + @logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping) + @ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) else @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') @@ -86,38 +86,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli @logger.debug('Kusto resources are ready.') end - def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth) - # Add an additional validation and fail this upfront - if app_id.nil? && app_key.nil? && managed_identity_id.nil? - if cli_auth - @logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production') - else - @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') - raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') - end - end - if database =~ FIELD_REF - @logger.error('database config value should not be dynamic.', database) - raise LogStash::ConfigurationError.new('database config value should not be dynamic.') - end - - if table =~ FIELD_REF - @logger.error('table config value should not be dynamic.', table) - raise LogStash::ConfigurationError.new('table config value should not be dynamic.') - end - - if json_mapping =~ FIELD_REF - @logger.error('json_mapping config value should not be dynamic.', json_mapping) - raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') - end - - if not(["https", "http"].include? proxy_protocol) - @logger.error('proxy_protocol has to be http or https.', proxy_protocol) - raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') - end - - end - def upload_async(data) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") @@ -132,7 +100,6 @@ def upload_async(data) exception = e end end - # Wait for the task to complete and check for exceptions @workers_pool.shutdown @workers_pool.wait_for_termination @@ -148,36 +115,31 @@ def upload_async(data) def upload(data) @logger.info("Sending data to Kusto") - + if data.size > 0 - ingestionLatch = java.util.concurrent.CountDownLatch.new(1) + thread_exception = nil - Thread.new do + future = java.util.concurrent.CompletableFuture.supplyAsync do begin data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) - ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) - - # Check the ingestion status - status = ingestion_result.getIngestionStatusCollection.get(0) - if status.status != Java::com.microsoft.azure.kusto.ingest.result.OperationStatus::Queued - raise "Failed upload: #{status.errorCodeString}" - end - @logger.info("Final ingestion status: #{status.status}") + @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) rescue => e @logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace) - if e.message.include?("network") - raise e - end - ensure - ingestionLatch.countDown() + thread_exception = e end end - # Wait for the ingestion to complete with a timeout - if !ingestionLatch.await(30, java.util.concurrent.TimeUnit::SECONDS) + begin + future.get(@latch_timeout, java.util.concurrent.TimeUnit::SECONDS) + rescue java.util.concurrent.TimeoutException => e @logger.error('Ingestion timed out, possible network issue.') - raise 'Ingestion timed out, possible network issue.' + thread_exception = 'Ingestion timed out, possible network issue.' + rescue java.util.concurrent.ExecutionException => e + thread_exception = e.cause end + + # Raise the exception from the thread if it occurred + raise thread_exception if thread_exception else @logger.warn("Data is empty and is not ingested.") end diff --git a/lib/logstash/outputs/kusto/interval.rb b/lib/logstash/outputs/kusto/interval.rb deleted file mode 100755 index 33046309..00000000 --- a/lib/logstash/outputs/kusto/interval.rb +++ /dev/null @@ -1,81 +0,0 @@ -# encoding: utf-8 - -require 'logstash/outputs/base' -require 'logstash/namespace' -require 'logstash/errors' - -class LogStash::Outputs::Kusto < LogStash::Outputs::Base - ## - # Bare-bones utility for running a block of code at an interval. - # - class Interval - ## - # Initializes a new Interval with the given arguments and starts it - # before returning it. - # - # @param interval [Integer] (see: Interval#initialize) - # @param procsy [#call] (see: Interval#initialize) - # - # @return [Interval] - # - def self.start(interval, procsy) - new(interval, procsy).tap(&:start) - end - - ## - # @param interval [Integer]: time in seconds to wait between calling the given proc - # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions. - def initialize(interval, procsy) - @interval = interval - @procsy = procsy - - # Mutex, ConditionVariable, etc. - @mutex = Mutex.new - @sleeper = ConditionVariable.new - end - - ## - # Starts the interval, or returns if it has already been started. - # - # @return [void] - def start - @mutex.synchronize do - return if @thread && @thread.alive? - - @thread = Thread.new { run } - end - end - - ## - # Stop the interval. - # Does not interrupt if execution is in-progress. - def stop - @mutex.synchronize do - @stopped = true - end - - @thread && @thread.join - end - - ## - # @return [Boolean] - def alive? - @thread && @thread.alive? - end - - private - - def run - @mutex.synchronize do - loop do - @sleeper.wait(@mutex, @interval) - break if @stopped - - @procsy.call - end - end - ensure - @sleeper.broadcast - end - end -end diff --git a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb new file mode 100644 index 00000000..0aecb874 --- /dev/null +++ b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb @@ -0,0 +1,155 @@ +# encoding: utf-8 +# A class just having all the configurations wrapped into a seperate object +module LogStash + module Outputs + module KustoInternal + class KustoLogstashConfiguration + FIELD_REF = /%\{[^}]+\}/ + def initialize(kusto_ingest,kusto_auth, kusto_proxy, logger) + @logger = logger + @kusto_ingest = kusto_ingest + @kusto_auth = kusto_auth + @kusto_proxy = kusto_proxy + @logger.info("Kusto configuration initialized.") + end # def initialize + + # Configuration + def kusto_ingest + @kusto_ingest + end + def kusto_auth + @kusto_auth + end + def kusto_proxy + @kusto_proxy + end + + def validate_config() + # Add an additional validation and fail this upfront + if @kusto_auth.app_id.to_s.empty? && @kusto_auth.managed_identity_id.to_s.empty? && !@kusto_auth.cli_auth + @logger.error('managed_identity_id is not provided, cli_auth is false and app_id/app_key is empty.') + raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') + end + # If proxy AAD is required and the proxy configuration is not provided - fail + if @kusto_proxy.proxy_aad_only && @kusto_proxy.is_direct_conn + @logger.error('proxy_aad_only can be used only when proxy is configured.', @kusto_proxy.proxy_aad_only) + raise LogStash::ConfigurationError.new('proxy_aad_only can be used only when proxy is configured.') + end + + if @kusto_ingest.database =~ FIELD_REF + @logger.error('database config value should not be dynamic.', @kusto_ingest.database) + raise LogStash::ConfigurationError.new('database config value should not be dynamic.') + end + if @kusto_ingest.table =~ FIELD_REF + @logger.error('table config value should not be dynamic.', @kusto_ingest.table) + raise LogStash::ConfigurationError.new('table config value should not be dynamic.') + end + if @kusto_ingest.json_mapping =~ FIELD_REF + @logger.error('json_mapping config value should not be dynamic.', @kusto_ingest.json_mapping) + raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') + end + if not(["https", "http"].include? @kusto_proxy.proxy_protocol) + @logger.error('proxy_protocol has to be http or https.', @kusto_proxy.proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + end + + if @kusto_proxy.proxy_aad_only && @kusto_proxy.is_direct_conn + @logger.error('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.',@kusto_proxy.proxy_host,@kusto_proxy.proxy_port,@kusto_proxy.proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.') + end + # If all validation pass then configuration is valid + return true + end #validate_config() + + end # class KustoLogstashConfiguration + class KustoAuthConfiguration + def initialize(app_id, app_key, app_tenant, managed_identity_id, cli_auth) + @app_id = app_id + @app_key = app_key + @app_tenant = app_tenant + @managed_identity_id = managed_identity_id + @cli_auth = cli_auth + @is_managed_identity = app_id.to_s.empty? && app_key.to_s.empty? && !cli_auth + @is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(kusto_auth.managed_identity_id) + end + # Authentication configuration + def app_id + @app_id + end + def app_key + @app_key + end + def app_tenant + @app_tenant + end + def managed_identity_id + @managed_identity_id + end + def is_managed_identity + @is_managed_identity + end + def cli_auth + @cli_auth + end + def is_system_assigned_managed_identity + @is_system_assigned_managed_identity + end + end # class KustoAuthConfiguration + class KustoProxyConfiguration + def initialize(proxy_host , proxy_port , proxy_protocol, proxy_aad_only) + @proxy_host = proxy_host + @proxy_port = proxy_port + @proxy_protocol = proxy_protocol + @proxy_aad_only = proxy_aad_only + # Is it direct connection + @is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + end + # proxy configuration + def proxy_host + @proxy_host + end + + def proxy_port + @proxy_port + end + + def proxy_protocol + @proxy_protocol + end + + def proxy_aad_only + @proxy_aad_only + end + + def is_direct_conn + @is_direct_conn + end + end # class KustoProxyConfiguration + class KustoIngestConfiguration + def initialize(ingest_url, database, table, json_mapping) + @ingest_url = ingest_url + @database = database + @table = table + @json_mapping = json_mapping + @is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) + end + # For ingestion + def ingest_url + @ingest_url + end + def database + @database + end + def table + @table + end + def json_mapping + @json_mapping + end + def is_mapping_ref_provided + @is_mapping_ref_provided + end + end # class KustoIngestionConfiguration + end # module KustoInternal + end # module Outputs +end # module LogStash diff --git a/spec/outputs/kusto/ingestor_spec.rb b/spec/outputs/kusto/ingestor_spec.rb deleted file mode 100755 index a077549b..00000000 --- a/spec/outputs/kusto/ingestor_spec.rb +++ /dev/null @@ -1,97 +0,0 @@ -# encoding: utf-8 -require_relative "../../spec_helpers.rb" -require 'logstash/outputs/kusto' -require 'logstash/outputs/kusto/ingestor' - -describe LogStash::Outputs::Kusto::Ingestor do - - let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } - let(:app_id) { "myid" } - let(:app_key) { LogStash::Util::Password.new("mykey") } - let(:app_tenant) { "mytenant" } - let(:managed_identity) { "managed_identity" } - let(:database) { "mydatabase" } - let(:cliauth) { false } - let(:table) { "mytable" } - let(:proxy_host) { "localhost" } - let(:proxy_port) { 80 } - let(:proxy_protocol) { "http" } - let(:json_mapping) { "mymapping" } - let(:logger) { spy('logger') } - - describe 'Ingestor' do - - it 'does not throw an error when initializing' do - RSpec.configuration.reporter.message("Running test: does not throw an error when initializing") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.not_to raise_error - RSpec.configuration.reporter.message("Completed test: does not throw an error when initializing") - end - - dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] - - context 'doesnt allow database to have some dynamic part' do - dynamic_name_array.each do |test_database| - it "with database: #{test_database}" do - RSpec.configuration.reporter.message("Running test: doesnt allow database to have some dynamic part with database: #{test_database}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, test_database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow database to have some dynamic part with database: #{test_database}") - end - end - end - - context 'doesnt allow table to have some dynamic part' do - dynamic_name_array.each do |test_table| - it "with table: #{test_table}" do - RSpec.configuration.reporter.message("Running test: doesnt allow table to have some dynamic part with table: #{test_table}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, test_table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow table to have some dynamic part with table: #{test_table}") - end - end - end - - context 'doesnt allow mapping to have some dynamic part' do - dynamic_name_array.each do |json_mapping| - it "with mapping: #{json_mapping}" do - RSpec.configuration.reporter.message("Running test: doesnt allow mapping to have some dynamic part with mapping: #{json_mapping}") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, proxy_protocol, logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: doesnt allow mapping to have some dynamic part with mapping: #{json_mapping}") - end - end - end - - context 'proxy protocol has to be http or https' do - it "with proxy protocol: socks" do - RSpec.configuration.reporter.message("Running test: proxy protocol has to be http or https with proxy protocol: socks") - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cliauth, database, table, json_mapping, proxy_host, proxy_port, 'socks', logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: proxy protocol has to be http or https with proxy protocol: socks") - end - end - - context 'one of appid or managedid has to be provided' do - it "with empty managed identity and appid" do - RSpec.configuration.reporter.message("Running test: one of appid or managedid has to be provided with empty managed identity and appid") - expect { - ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "", cliauth, database, table, json_mapping, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: one of appid or managedid has to be provided with empty managed identity and appid") - end - end - end - -end diff --git a/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb new file mode 100755 index 00000000..cbb09ea7 --- /dev/null +++ b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb @@ -0,0 +1,106 @@ +# encoding: utf-8 +require_relative "../../spec_helpers.rb" +require 'logstash/outputs/kusto' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' + +describe LogStash::Outputs::KustoInternal::KustoLogstashConfiguration do + + let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } + let(:app_id) { "myid" } + let(:app_key) { LogStash::Util::Password.new("mykey") } + let(:app_tenant) { "mytenant" } + let(:managed_identity) { "managed_identity" } + let(:database) { "mydatabase" } + let(:cliauth) { false } + let(:table) { "mytable" } + let(:proxy_host) { "localhost" } + let(:proxy_port) { 80 } + let(:proxy_protocol) { "http" } + let(:json_mapping) { "mymapping" } + let(:delete_local) { false } + let(:logger) { spy(:logger) } + let(:proxy_aad_only) { false } + + let(:kusto_ingest_base) { LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping) } + let(:kusto_auth_base) { LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cliauth) } + let(:kusto_proxy_base) { LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false) } + + describe '#initialize' do + it 'does not throw an error when initializing' do + # note that this will cause an internal error since connection is being tried. + # however we still want to test that all the java stuff is working as expected + expect { + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.not_to raise_error + end + + dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] + + context 'doesnt allow database to have some dynamic part' do + dynamic_name_array.each do |test_database| + it "with database: #{test_database}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, test_database, table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow table to have some dynamic part' do + dynamic_name_array.each do |test_table| + it "with database: #{test_table}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, test_table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow mapping to have some dynamic part' do + dynamic_name_array.each do |json_mapping| + it "with database: #{json_mapping}" do + expect { + kusto_ingest = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, json_mapping) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest, kusto_auth_base , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'proxy protocol has to be http or https' do + it "with proxy protocol: socks" do + expect { + kusto_proxy = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , 'socks', false) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'one of appid or managedid or cli_auth has to be provided' do + it "with empty managed identity and appid" do + expect { + kusto_auth = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new("", app_key, app_tenant, "", false) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth , kusto_proxy_base, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'if proxy_aad is provided' do + it "proxy details should be provided" do + expect { + kusto_proxy = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new("" , "" , proxy_protocol, true) + kustoLogstashOutputConfiguration = described_class.new(kusto_ingest_base, kusto_auth_base , kusto_proxy, logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end +end \ No newline at end of file diff --git a/spec/outputs/kusto_spec.rb b/spec/outputs/kusto_spec.rb deleted file mode 100755 index 967f9a11..00000000 --- a/spec/outputs/kusto_spec.rb +++ /dev/null @@ -1,159 +0,0 @@ -# encoding: utf-8 -require_relative "../spec_helpers.rb" -require 'logstash/outputs/kusto' -require 'logstash/codecs/plain' -require 'logstash/event' - -describe LogStash::Outputs::Kusto do - - let(:options) { { "path" => "./kusto_tst/%{+YYYY-MM-dd-HH-mm}", - "ingest_url" => "https://ingest-sdkse2etest.eastus.kusto.windows.net/", - "app_id" => "myid", - "app_key" => "mykey", - "app_tenant" => "mytenant", - "database" => "mydatabase", - "table" => "mytable", - "json_mapping" => "mymapping", - "proxy_host" => "localhost", - "proxy_port" => 3128, - "proxy_protocol" => "https", - "max_size" => 2000, - "max_interval" => 10 - } } - - describe '#initialize' do - it 'initializes with the correct options' do - RSpec.configuration.reporter.message("Running test: initializes with the correct options") - kusto = described_class.new(options.merge("app_key" => LogStash::Util::Password.new("mykey"))) - expect(kusto.instance_variable_get(:@path)).to eq("./kusto_tst/%{+YYYY-MM-dd-HH-mm}") - expect(kusto.instance_variable_get(:@ingest_url)).to eq("https://ingest-sdkse2etest.eastus.kusto.windows.net/") - expect(kusto.instance_variable_get(:@app_id)).to eq("myid") - expect(kusto.instance_variable_get(:@app_key).value).to eq("mykey") - expect(kusto.instance_variable_get(:@app_tenant)).to eq("mytenant") - expect(kusto.instance_variable_get(:@database)).to eq("mydatabase") - expect(kusto.instance_variable_get(:@table)).to eq("mytable") - expect(kusto.instance_variable_get(:@json_mapping)).to eq("mymapping") - expect(kusto.instance_variable_get(:@proxy_host)).to eq("localhost") - expect(kusto.instance_variable_get(:@proxy_port)).to eq(3128) - expect(kusto.instance_variable_get(:@proxy_protocol)).to eq("https") - expect(kusto.instance_variable_get(:@max_size)).to eq(2000) - expect(kusto.instance_variable_get(:@max_interval)).to eq(10) - RSpec.configuration.reporter.message("Completed test: initializes with the correct options") - end - end - - describe '#multi_receive_encoded' do - it 'processes events and adds them to the buffer' do - RSpec.configuration.reporter.message("Running test: processes events and adds them to the buffer") - kusto = described_class.new(options) - kusto.register - - events = [LogStash::Event.new("message" => "test1"), LogStash::Event.new("message" => "test2")] - encoded_events = events.map { |e| [e, e.to_json] } - - # Temporarily disable automatic flushing for the test - buffer = kusto.instance_variable_get(:@buffer) - allow(buffer).to receive(:buffer_flush) - - # Clear the buffer before the test - buffer.instance_variable_set(:@buffer_state, { pending_items: [], pending_size: 0, last_flush: Time.now.to_i }) - - kusto.multi_receive_encoded(encoded_events) - - pending_items = buffer.instance_variable_get(:@buffer_state)[:pending_items] - RSpec.configuration.reporter.message("Pending items in buffer: #{pending_items.inspect}") - - expect(pending_items.size).to eq(2) - RSpec.configuration.reporter.message("Completed test: processes events and adds them to the buffer") - end - - it 'handles errors during event processing' do - RSpec.configuration.reporter.message("Running test: handles errors during event processing") - kusto = described_class.new(options) - kusto.register - - allow(kusto.instance_variable_get(:@buffer)).to receive(:<<).and_raise(StandardError.new("Test error")) - events = [LogStash::Event.new("message" => "test1")] - encoded_events = events.map { |e| [e, e.to_json] } - - expect { kusto.multi_receive_encoded(encoded_events) }.not_to raise_error - RSpec.configuration.reporter.message("Completed test: handles errors during event processing") - end - end - - describe '#register' do - it 'raises an error for invalid configurations' do - RSpec.configuration.reporter.message("Running test: raises an error for invalid configurations") - invalid_options = options.merge("ingest_url" => nil) - expect { described_class.new(invalid_options).register }.to raise_error(LogStash::ConfigurationError) - RSpec.configuration.reporter.message("Completed test: raises an error for invalid configurations") - end - end - - describe '#flush_buffer' do - - it 'flushes the buffer when max_size is reached' do - RSpec.configuration.reporter.message("Running test: flushes the buffer when max_size is reached") - kusto = described_class.new(options.merge("max_size" => 1)) # Set max_size to 1MB for testing - kusto.register - - events = [LogStash::Event.new("message" => "test1")] - encoded_events = events.map { |e| [e, e.to_json] } - # Ensure upload_async is called only once - expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything).once - kusto.multi_receive_encoded(encoded_events) - - # Trigger the buffer flush manually - buffer = kusto.instance_variable_get(:@buffer) - buffer.send(:buffer_flush, force: true) - - RSpec.configuration.reporter.message("Completed test: flushes the buffer when max_size is reached") - end - - it 'flushes the buffer when max_interval is reached' do - RSpec.configuration.reporter.message("Running test: flushes the buffer when max_interval is reached") - kusto = described_class.new(options.merge("max_interval" => 1)) # Set max_interval to 1 second for testing - kusto.register - - events = [LogStash::Event.new("message" => "test1")] - encoded_events = events.map { |e| [e, e.to_json] } - kusto.multi_receive_encoded(encoded_events) - sleep(2) # Wait for the interval to pass - - expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything) - kusto.flush_buffer(encoded_events) # Pass the encoded events here - RSpec.configuration.reporter.message("Completed test: flushes the buffer when max_interval is reached") - end - - it 'eventually flushes without receiving additional events based on max_interval' do - RSpec.configuration.reporter.message("Running test: eventually flushes without receiving additional events based on max_interval") - kusto = described_class.new(options.merge("max_interval" => 1)) # Set max_interval to 1 second for testing - kusto.register - - events = [LogStash::Event.new("message" => "test1")] - encoded_events = events.map { |e| [e, e.to_json] } - kusto.multi_receive_encoded(encoded_events) - - # Wait for the interval to pass - sleep(2) - - expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything) - kusto.flush_buffer(encoded_events) # Pass the encoded events here - RSpec.configuration.reporter.message("Completed test: eventually flushes without receiving additional events based on max_interval") - end - end - - describe '#close' do - it 'shuts down the buffer and ingestor' do - RSpec.configuration.reporter.message("Running test: shuts down the buffer and ingestor") - kusto = described_class.new(options) - kusto.register - - expect(kusto.instance_variable_get(:@buffer)).to receive(:shutdown) - expect(kusto.instance_variable_get(:@ingestor)).to receive(:stop) - - kusto.close - RSpec.configuration.reporter.message("Completed test: shuts down the buffer and ingestor") - end - end -end \ No newline at end of file