From d3093e4b4458ac9f163f67c532ee1bad8ae0223e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 29 Jan 2025 13:51:09 +0000 Subject: [PATCH 01/27] integration tests: switch log input to filestream in filebeat (#16983) Log input has been deprecated in filebeat 9.0.0 and throws an error if it's present in the configuration. This commit switches the configuration to the "filestream" input. --- qa/integration/specs/beats_input_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/qa/integration/specs/beats_input_spec.rb b/qa/integration/specs/beats_input_spec.rb index 95ce50e82fc..920c59b5009 100644 --- a/qa/integration/specs/beats_input_spec.rb +++ b/qa/integration/specs/beats_input_spec.rb @@ -84,7 +84,7 @@ let(:filebeat_config) do { "filebeat" => { - "inputs" => [{ "paths" => [log_path], "input_type" => "log" }], + "inputs" => [{ "paths" => [log_path], "type" => "filestream" }], "registry.path" => registry_file, "scan_frequency" => "1s" }, @@ -109,7 +109,7 @@ let(:filebeat_config) do { "filebeat" => { - "inputs" => [{ "paths" => [log_path], "input_type" => "log" }], + "inputs" => [{ "paths" => [log_path], "type" => "filestream" }], "registry.path" => registry_file, "scan_frequency" => "1s" }, @@ -136,7 +136,7 @@ let(:filebeat_config) do { "filebeat" => { - "inputs" => [{ "paths" => [log_path], "input_type" => "log" }], + "inputs" => [{ "paths" => [log_path], "type" => "filestream" }], "registry.path" => registry_file, "scan_frequency" => "1s" }, From 6660395f4dcf08a58d86d8018eca9061b36b89ab Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Wed, 29 Jan 2025 19:04:16 +0100 Subject: [PATCH 02/27] Update branches.json for 8.18 (#16981) Add 8.18 to CI inventory branches --- ci/branches.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ci/branches.json b/ci/branches.json index 5b81cb84e92..20b4fb176c6 100644 --- a/ci/branches.json +++ b/ci/branches.json @@ -13,6 +13,9 @@ { "branch": "8.17" }, + { + "branch": "8.18" + }, { "branch": "7.17" } From 8a41a4e0e5b37f38bfa3baa03e359144a143e65f Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Wed, 29 Jan 2025 17:31:37 -0500 Subject: [PATCH 03/27] Remove sample breaking change from breaking changes doc (#16978) --- docs/static/breaking-changes-90.asciidoc | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/docs/static/breaking-changes-90.asciidoc b/docs/static/breaking-changes-90.asciidoc index 34413d0f7cb..88a5fc80685 100644 --- a/docs/static/breaking-changes-90.asciidoc +++ b/docs/static/breaking-changes-90.asciidoc @@ -3,16 +3,6 @@ === Breaking changes in 9.0 coming[9.0.0] -[discrete] -[[sample-change-9.0]] -===== Breaking change number one (sample) - -Include: - -* change -* user impact/decription/value prop -* link to relevant docs for more information - [discrete] [[ssl-settings-9.0]] ===== Changes to SSL settings in {ls} plugins From 70a6c9aea6c00854e3ad138ea22e0c73afbb649d Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Wed, 29 Jan 2025 17:33:17 -0500 Subject: [PATCH 04/27] [wip] Changing upgrade docs to refer to `9.0` instead of `8.0` (#16977) --- docs/static/upgrading.asciidoc | 45 ++++++++++++++++------------------ 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/docs/static/upgrading.asciidoc b/docs/static/upgrading.asciidoc index 3b1a26da550..fc26dd95fdc 100644 --- a/docs/static/upgrading.asciidoc +++ b/docs/static/upgrading.asciidoc @@ -24,14 +24,14 @@ See the following topics for information about upgrading Logstash: * <> * <> * <> -* <> +* <> [discrete] ==== When to upgrade Fresh installations can and should start with the same version across the Elastic Stack. -Elasticsearch 8.0 does not require Logstash 8.0. An Elasticsearch 8.0 cluster +Elasticsearch 9.0 does not require Logstash 9.0. An Elasticsearch 9.0 cluster will happily receive data from earlier versions of Logstash via the default HTTP communication layer. This provides some flexibility to decide when to upgrade Logstash relative to an Elasticsearch upgrade. It may or may not be @@ -39,23 +39,23 @@ convenient for you to upgrade them together, and it is not required to be done at the same time as long as Elasticsearch is upgraded first. You should upgrade in a timely manner to get the performance improvements that -come with Logstash 8.0, but do so in the way that makes the most sense for your +come with Logstash 9.0, but do so in the way that makes the most sense for your environment. [discrete] ==== When not to upgrade -If any Logstash plugin that you require is not compatible with Logstash 8.0, then you should wait until it is ready +If any Logstash plugin that you require is not compatible with Logstash 9.0, then you should wait until it is ready before upgrading. -Although we make great efforts to ensure compatibility, Logstash 8.0 is not completely backwards compatible. -As noted in the Elastic Stack upgrade guide, you should not upgrade Logstash 8.0 before you upgrade Elasticsearch 8.0. +Although we make great efforts to ensure compatibility, Logstash 9.0 is not completely backwards compatible. +As noted in the Elastic Stack upgrade guide, you should not upgrade Logstash 9.0 before you upgrade Elasticsearch 9.0. This is both -practical and because some Logstash 8.0 plugins may attempt to use features of Elasticsearch 8.0 that did not exist +practical and because some Logstash 9.0 plugins may attempt to use features of Elasticsearch 9.0 that did not exist in earlier versions. For example, if you attempt to send the 8.x template to a cluster before -Elasticsearch 8.0, then all indexing likely fail. +Elasticsearch 9.0, then all indexing likely fail. If you use your own custom template with Logstash, then this issue can be ignored. @@ -66,10 +66,10 @@ This procedure uses <> to upgrade Logstas . Shut down your Logstash pipeline, including any inputs that send events to Logstash. . Using the directions in the <> section, update your repository -links to point to the 8.x repositories. +links to point to the 9.x repositories. . Run the `apt-get upgrade logstash` or `yum update logstash` command as appropriate for your operating system. . Test your configuration file with the `logstash --config.test_and_exit -f ` command. Configuration options for -some Logstash plugins have changed in the 8.x release. +some Logstash plugins have changed in the 9.x release. . Restart your Logstash pipeline after you have updated your configuration file. [[upgrading-using-direct-download]] @@ -91,18 +91,18 @@ some Logstash plugins have changed. [[upgrading-minor-versions]] === Upgrading between minor versions -As a general rule, you can upgrade between minor versions (for example, 8.x to -8.y, where x < y) by simply installing the new release and restarting {ls}. +As a general rule, you can upgrade between minor versions (for example, 9.x to +9.y, where x < y) by simply installing the new release and restarting {ls}. {ls} typically maintains backwards compatibility for configuration settings and exported fields. Please review the <> for potential exceptions. -Upgrading between non-consecutive major versions (6.x to 8.x, for example) is +Upgrading between non-consecutive major versions (7.x to 9.x, for example) is not supported. -[[upgrading-logstash-8.0]] -=== Upgrading Logstash to 8.0 +[[upgrading-logstash-9.0]] +=== Upgrading Logstash to 9.0 Before upgrading Logstash: @@ -111,26 +111,23 @@ Before upgrading Logstash: + There you can find info on these topics and more: -** <> -** <> -** <> - +** <> If you are installing Logstash with other components in the Elastic Stack, also see the {stack-ref}/index.html[Elastic Stack installation and upgrade documentation]. -NOTE: Upgrading between non-consecutive major versions (6.x to 8.x, for example) is not supported. -We recommend that you upgrade to {prev-major-last}, and then upgrade to 8.0. +NOTE: Upgrading between non-consecutive major versions (7.x to 9.x, for example) is not supported. +We recommend that you upgrade to {prev-major-last}, and then upgrade to 9.0. [discrete] [[upgrade-to-previous]] -==== Upgrade to {ls} {prev-major-last} before upgrading to 8.0 +==== Upgrade to {ls} {prev-major-last} before upgrading to 9.0 -If you haven't already, upgrade to version {prev-major-last} before you upgrade to 8.0. If +If you haven't already, upgrade to version {prev-major-last} before you upgrade to 9.0. If you're using other products in the {stack}, upgrade {ls} as part of the {stack-ref}/upgrading-elastic-stack.html[{stack} upgrade process]. -TIP: Upgrading to {ls} {prev-major-last} gives you a head-start on new 8.0 features. +TIP: Upgrading to {ls} {prev-major-last} gives you a head-start on new 9.0 features. This step helps reduce risk and makes roll backs easier if you hit a snag. From 7378b85f41f4fc302ff04754639ff97b2e7ef720 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Wed, 29 Jan 2025 15:20:47 -0800 Subject: [PATCH 05/27] Adding elastic_integration upgrade guidelines. (#16979) * Adding elastic_integration upgrade guidelines. --- docs/static/upgrading.asciidoc | 47 +++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/docs/static/upgrading.asciidoc b/docs/static/upgrading.asciidoc index fc26dd95fdc..c8c74d548c6 100644 --- a/docs/static/upgrading.asciidoc +++ b/docs/static/upgrading.asciidoc @@ -17,7 +17,7 @@ and becomes a new node in the monitoring data. =========================================== If you're upgrading other products in the stack, also read the -{stack-ref}/index.html[Elastic Stack Installation and Upgrade Guide]. +{stack-ref}/index.html[Elastic Stack Installation and Upgrade Guide]. See the following topics for information about upgrading Logstash: @@ -36,7 +36,8 @@ will happily receive data from earlier versions of Logstash via the default HTTP communication layer. This provides some flexibility to decide when to upgrade Logstash relative to an Elasticsearch upgrade. It may or may not be convenient for you to upgrade them together, and it is not required to be done -at the same time as long as Elasticsearch is upgraded first. +at the same time as long as Elasticsearch is upgraded first. However, there are special plugin cases for example, if your pipeline includes <> plugin. +See <> section for details. You should upgrade in a timely manner to get the performance improvements that come with Logstash 9.0, but do so in the way that makes the most sense for your @@ -48,16 +49,48 @@ environment. If any Logstash plugin that you require is not compatible with Logstash 9.0, then you should wait until it is ready before upgrading. -Although we make great efforts to ensure compatibility, Logstash 9.0 is not completely backwards compatible. -As noted in the Elastic Stack upgrade guide, you should not upgrade Logstash 9.0 before you upgrade Elasticsearch 9.0. -This is both -practical and because some Logstash 9.0 plugins may attempt to use features of Elasticsearch 9.0 that did not exist +Although we make great efforts to ensure compatibility, Logstash 9.0 is not completely backwards compatible. +As noted in the Elastic Stack upgrade guide, you should not upgrade Logstash 9.0 before you upgrade Elasticsearch 9.0. +This is both practical and because some Logstash 9.0 plugins may attempt to use features of Elasticsearch 9.0 that did not exist in earlier versions. For example, if you attempt to send the 8.x template to a cluster before -Elasticsearch 9.0, then all indexing likely fail. +Elasticsearch 9.0, then all indexing likely fail. If you use your own custom template with Logstash, then this issue can be ignored. +Another example is when your pipeline utilizes the <> plugin. +In such cases, the plugin may encounter issues loading and executing deprecated integrations or features that have been removed in newer versions. +This can lead to disruptions in your pipeline's functionality, especially if your workflow relies on these outdated components. +For a comprehensive understanding of how to handle such scenarios and ensure compatibility, refer to the <> section in this documentation. + +[discrete] +[[upgrading-when-elastic_integration-in-pipeline]] +==== When `elastic_integration` is in {ls} pipeline + +<> plugin requires a special attention due to its dependencies on various components of the stack such as {es}, {kib} and {ls}. +Any updates, deprecations, or changes in the stack products can directly impact the functionality of the plugin. + +*When upgrading {es}* + +This plugin is compiled with a specific version of {es} and embeds {es} Ingest Node components that match the `major.minor` stack version. Therefore, we recommend using a plugin version that aligns with the `major.minor` version of your stack. + +If the versions do not match, the plugin may encounter issues such as failing to load or execute pipelines. For example, if your {es} version is newer than the plugin, the plugin may not support new features introduced in the updated {es} version. +Conversely, if your {es} version is older, the plugin may rely on features that have been deprecated or removed in your {es} version. + +*When upgrading {kib}* + +When you upgrade {kib}, {kib} downloads the latest version of the integrations through {fleet-guide}/fleet-overview.html#package-registry-intro[Elastic Package Registry]. +As part of the upgrade process, you will also have the opportunity to review and upgrade your currently installed integrations to their latest versions. +However, we strongly recommend upgrading the <> plugin before upgrading {kib} and {es}. +This is because <> plugin pulls and processes the ingest pipelines associated with the installed integrations. +These pipelines are then executed using the {es} Ingest Node components that the plugin was compiled with. +If {es} or {es} is upgraded first, there is a risk of incompatibility between the plugin's ingest componenets and the newer versions of {es}'s Ingest Node features or {kib}'s integration definitions. + +*When upgrading {ls}* + +This plugin is by default embedded in {ls} core. When you upgrade {ls}, new version of the plugin is installed. +The plugin is backward compatible accross {ls} 8.x versions. However, if you are considering to upgrade {ls} only (not the plugin), there are exceptions cases, such as JDK compatibility which require matching certain {ls} versions. +We recommend visiting <> guide considering the {ls} version you are upgrading to. [[upgrading-using-package-managers]] === Upgrading using package managers From 51ab5d85d28d455e18d508ddfd0654cfc27fb46d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Thu, 30 Jan 2025 11:17:04 +0000 Subject: [PATCH 06/27] upgrade jdk to 21.0.6+7 (#16932) --- versions.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/versions.yml b/versions.yml index 9151224b09b..32fdd32b5f0 100644 --- a/versions.yml +++ b/versions.yml @@ -7,8 +7,8 @@ logstash-core-plugin-api: 2.1.16 bundled_jdk: # for AdoptOpenJDK/OpenJDK jdk-14.0.1+7.1, the revision is 14.0.1 while the build is 7.1 vendor: "adoptium" - revision: 21.0.5 - build: 11 + revision: 21.0.6 + build: 7 # jruby must reference a *released* version of jruby which can be downloaded from the official download url # *and* for which jars artifacts are published for compile-time From 217287998949b8ce3172710eb5bac455d701050d Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Thu, 30 Jan 2025 16:39:27 +0100 Subject: [PATCH 07/27] github-action: Add AsciiDoc freeze warning (#16969) --- .../workflows/comment-on-asciidoc-changes.yml | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .github/workflows/comment-on-asciidoc-changes.yml diff --git a/.github/workflows/comment-on-asciidoc-changes.yml b/.github/workflows/comment-on-asciidoc-changes.yml new file mode 100644 index 00000000000..8e5f836b148 --- /dev/null +++ b/.github/workflows/comment-on-asciidoc-changes.yml @@ -0,0 +1,21 @@ +--- +name: Comment on PR for .asciidoc changes + +on: + # We need to use pull_request_target to be able to comment on PRs from forks + pull_request_target: + types: + - synchronize + - opened + - reopened + branches: + - main + - master + - "9.0" + +jobs: + comment-on-asciidoc-change: + permissions: + contents: read + pull-requests: write + uses: elastic/docs-builder/.github/workflows/comment-on-asciidoc-changes.yml@main From 786911fa6d66c6cb67d7ab84944741b88e93146d Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Thu, 30 Jan 2025 11:37:30 -0800 Subject: [PATCH 08/27] Add 9.0 branch to the CI branches definition (#16997) --- ci/branches.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ci/branches.json b/ci/branches.json index 20b4fb176c6..a5afd2f4d0e 100644 --- a/ci/branches.json +++ b/ci/branches.json @@ -4,6 +4,9 @@ { "branch": "main" }, + { + "branch": "9.0" + }, { "branch": "8.x" }, From 14c16de0c5fdfc817799d04dcdc7526298558101 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Thu, 30 Jan 2025 15:13:58 -0800 Subject: [PATCH 09/27] Core version bump to 9.1.0 (#16991) --- versions.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/versions.yml b/versions.yml index 32fdd32b5f0..1f99ba0ebb3 100644 --- a/versions.yml +++ b/versions.yml @@ -1,7 +1,7 @@ --- # alpha and beta qualifiers are now added via VERSION_QUALIFIER environment var -logstash: 9.0.0 -logstash-core: 9.0.0 +logstash: 9.1.0 +logstash-core: 9.1.0 logstash-core-plugin-api: 2.1.16 bundled_jdk: From 32cc85b9a77f9566b4c8a5e76174293057bfa50e Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Fri, 31 Jan 2025 10:32:16 -0800 Subject: [PATCH 10/27] Add short living 9.0 next and update main in CI release version definition. (#17008) --- ci/logstash_releases.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ci/logstash_releases.json b/ci/logstash_releases.json index 80e564a6ed9..6b695636968 100644 --- a/ci/logstash_releases.json +++ b/ci/logstash_releases.json @@ -10,6 +10,7 @@ "8.current": "8.17.2-SNAPSHOT", "8.next": null, "8.future": "8.18.0-SNAPSHOT", - "main": "9.0.0-SNAPSHOT" + "9.next": "9.0.0-SNAPSHOT", + "main": "9.1.0-SNAPSHOT" } } From 1c8cf546c20517f5aba706ebdf177c1b9c2bd9d6 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Wed, 5 Feb 2025 10:25:08 +0100 Subject: [PATCH 11/27] Fix BufferedTokenizer to properly resume after a buffer full condition respecting the encoding of the input string (#16968) Permit to use effectively the tokenizer also in context where a line is bigger than a limit. Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens. ## How solve the problem This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694. The first try failed on returning the tokens in the same encoding of the input. This PR does a couple of things: - accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one. - respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset. --- .../logstash/common/BufferedTokenizerExt.java | 101 +++++++++-- .../common/BufferedTokenizerExtTest.java | 161 ++++++++++++++++++ ...BufferedTokenizerExtWithDelimiterTest.java | 66 +++++++ ...BufferedTokenizerExtWithSizeLimitTest.java | 111 ++++++++++++ 4 files changed, 426 insertions(+), 13 deletions(-) create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index be1c64d2356..e2c476520c1 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -23,14 +23,18 @@ import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyClass; +import org.jruby.RubyEncoding; import org.jruby.RubyObject; import org.jruby.RubyString; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.util.ByteList; import org.logstash.RubyUtil; +import java.nio.charset.Charset; + @JRubyClass(name = "BufferedTokenizer") public class BufferedTokenizerExt extends RubyObject { @@ -40,10 +44,13 @@ public class BufferedTokenizerExt extends RubyObject { freeze(RubyUtil.RUBY.getCurrentContext()); private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); + private StringBuilder headToken = new StringBuilder(); private RubyString delimiter = NEW_LINE; private int sizeLimit; private boolean hasSizeLimit; private int inputSize; + private boolean bufferFullErrorNotified = false; + private String encodingName; public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -80,23 +87,76 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { @JRubyMethod @SuppressWarnings("rawtypes") public RubyArray extract(final ThreadContext context, IRubyObject data) { + RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context); + encodingName = encoding.getEncoding().getCharsetName(); final RubyArray entities = data.convertToString().split(delimiter, -1); + if (!bufferFullErrorNotified) { + input.clear(); + input.concat(entities); + } else { + // after a full buffer signal + if (input.isEmpty()) { + // after a buffer full error, the remaining part of the line, till next delimiter, + // has to be consumed, unless the input buffer doesn't still contain fragments of + // subsequent tokens. + entities.shift(context); + input.concat(entities); + } else { + // merge last of the input with first of incoming data segment + if (!entities.isEmpty()) { + RubyString last = ((RubyString) input.pop(context)); + RubyString nextFirst = ((RubyString) entities.shift(context)); + entities.unshift(last.concat(nextFirst)); + input.concat(entities); + } + } + } + if (hasSizeLimit) { - final int entitiesSize = ((RubyString) entities.first()).size(); + if (bufferFullErrorNotified) { + bufferFullErrorNotified = false; + if (input.isEmpty()) { + return RubyUtil.RUBY.newArray(); + } + } + final int entitiesSize = ((RubyString) input.first()).size(); if (inputSize + entitiesSize > sizeLimit) { - throw new IllegalStateException("input buffer full"); + bufferFullErrorNotified = true; + headToken = new StringBuilder(); + String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); + inputSize = 0; + input.shift(context); // consume the token fragment that generates the buffer full + throw new IllegalStateException(errorMessage); } this.inputSize = inputSize + entitiesSize; } - input.append(entities.shift(context)); - if (entities.isEmpty()) { + + if (input.getLength() < 2) { + // this is a specialization case which avoid adding and removing from input accumulator + // when it contains just one element + headToken.append(input.shift(context)); // remove head return RubyUtil.RUBY.newArray(); } - entities.unshift(input.join(context)); - input.clear(); - input.append(entities.pop(context)); - inputSize = ((RubyString) input.first()).size(); - return entities; + + if (headToken.length() > 0) { + // if there is a pending token part, merge it with the first token segment present + // in the accumulator, and clean the pending token part. + headToken.append(input.shift(context)); // append buffer to first element and + // create new RubyString with the data specified encoding + RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); + input.unshift(encodedHeadToken); // reinsert it into the array + headToken = new StringBuilder(); + } + headToken.append(input.pop(context)); // put the leftovers in headToken for later + inputSize = headToken.length(); + return input; + } + + private RubyString toEncodedRubyString(ThreadContext context, String input) { + // Depends on the encodingName being set by the extract method, could potentially raise if not set. + RubyString result = RubyUtil.RUBY.newString(new ByteList(input.getBytes(Charset.forName(encodingName)))); + result.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + return result; } /** @@ -108,15 +168,30 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { */ @JRubyMethod public IRubyObject flush(final ThreadContext context) { - final IRubyObject buffer = input.join(context); - input.clear(); + final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); + headToken = new StringBuilder(); inputSize = 0; - return buffer; + + // create new RubyString with the last data specified encoding, if exists + RubyString encodedHeadToken; + if (encodingName != null) { + encodedHeadToken = toEncodedRubyString(context, buffer.toString()); + } else { + // When used with TCP input it could be that on socket connection the flush method + // is invoked while no invocation of extract, leaving the encoding name unassigned. + // In such case also the headToken must be empty + if (!buffer.toString().isEmpty()) { + throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); + } + encodedHeadToken = (RubyString) buffer; + } + + return encodedHeadToken; } @JRubyMethod(name = "empty?") public IRubyObject isEmpty(final ThreadContext context) { - return RubyUtil.RUBY.newBoolean(input.isEmpty() && (inputSize == 0)); + return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java new file mode 100644 index 00000000000..524abb36ed5 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyEncoding; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeASingleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); + + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldMergeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); + assertEquals(List.of("foobar"), tokens); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldTokenizeEmptyPayloadWithNewline() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n")); + assertEquals(List.of(""), tokens); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); + assertEquals(List.of("", "", ""), tokens); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioning() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + sut.extract(context, rubyInput); + IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) + .force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, capitalAInLatin1); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£A", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£", firstToken.toString()); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("A", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void givenDirectFlushInvocationUTF8EncodingIsApplied() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("UTF-8", encoding.toString()); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java new file mode 100644 index 00000000000..19872e66c3c --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); + + assertEquals(List.of("foo", "b|r"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); + assertEquals(List.of("foo"), tokens); + } +} diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java new file mode 100644 index 00000000000..9a07242369d --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.*; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)}; + sut.init(context, args); + } + + @Test + public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + } + + @Test + public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\nanother")); + assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); + } + + @Test + public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc")); + assertEquals(List.of("bbbb"), tokens); + } + + @Test + public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + //first buffer full on 13 "a" letters + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + // second buffer full on 11 "b" letters + Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc")); + }); + assertThat(secondThrownException.getMessage(), containsString("input buffer full")); + + // now should resemble processing on c and d + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n")); + assertEquals(List.of("ccccc", "ddd"), tokens); + } +} \ No newline at end of file From e23da7985cae6d7bc44857f8389dde9fe0386ff4 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Wed, 5 Feb 2025 11:07:56 -0800 Subject: [PATCH 12/27] Release note placeholder might be empty, making parsing lines nil tolerant. (#17026) --- tools/release/generate_release_notes.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/release/generate_release_notes.rb b/tools/release/generate_release_notes.rb index f2a6c1d1289..e1d88198e3c 100755 --- a/tools/release/generate_release_notes.rb +++ b/tools/release/generate_release_notes.rb @@ -48,8 +48,8 @@ coming_tag_index += 1 if coming_tag_index release_notes_entry_index = coming_tag_index || release_notes.find_index {|line| line.match(/^\[\[logstash/) } -report << "[[logstash-#{current_release_dashes}]]" unless release_notes.any? { |line| line.match(/^\[\[logstash-#{current_release_dashes}/) } -report << "=== Logstash #{current_release} Release Notes\n" unless release_notes.any? { |line| line.match(/^=== Logstash #{current_release}/)} +report << "[[logstash-#{current_release_dashes}]]" unless release_notes.any? { |line| line&.match(/^\[\[logstash-#{current_release_dashes}/) } +report << "=== Logstash #{current_release} Release Notes\n" unless release_notes.any? { |line| line&.match(/^=== Logstash #{current_release}/)} plugin_changes = {} From c7204fd7d62e593de433228581c5036e80474385 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Fri, 7 Feb 2025 13:05:23 +0200 Subject: [PATCH 13/27] Don't honor VERSION_QUALIFIER if set but empty (#17032) PR #17006 revealed that the `VERSION_QUALIFIER` env var gets honored in various scripts when present but empty. This shouldn't be the case as the DRA process is designed to gracefully ignore empty values for this variable. This commit changes various ruby scripts to not treat "" as truthy. Bash scripts (used by CI etc.) are already ok with this as part of refactorings done in #16907. --------- Co-authored-by: Andrea Selva --- logstash-core-plugin-api/logstash-core-plugin-api.gemspec | 2 +- logstash-core/logstash-core.gemspec | 2 +- qa/docker/spec/spec_helper.rb | 2 +- rakelib/artifacts.rake | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/logstash-core-plugin-api/logstash-core-plugin-api.gemspec b/logstash-core-plugin-api/logstash-core-plugin-api.gemspec index 4a599d84b25..05e0b8cfc84 100644 --- a/logstash-core-plugin-api/logstash-core-plugin-api.gemspec +++ b/logstash-core-plugin-api/logstash-core-plugin-api.gemspec @@ -13,7 +13,7 @@ if File.exist?(project_versions_yaml_path) # each time we build the logstash-core gem original_lines = IO.readlines(project_versions_yaml_path) # introduce the version qualifier (e.g. beta1, rc1) into the copied yml so it's displayed by Logstash - if ENV['VERSION_QUALIFIER'] + unless ENV['VERSION_QUALIFIER'].to_s.strip.empty? logstash_version_line = original_lines.find {|line| line.match(/^logstash:/) } logstash_version_line.chomp! logstash_version_line << "-#{ENV['VERSION_QUALIFIER']}\n" diff --git a/logstash-core/logstash-core.gemspec b/logstash-core/logstash-core.gemspec index 5efed823547..bb1a043e189 100644 --- a/logstash-core/logstash-core.gemspec +++ b/logstash-core/logstash-core.gemspec @@ -18,7 +18,7 @@ if File.exist?(project_versions_yaml_path) # each time we build the logstash-core gem original_lines = IO.readlines(project_versions_yaml_path) # introduce the version qualifier (e.g. beta1, rc1) into the copied yml so it's displayed by Logstash - if ENV['VERSION_QUALIFIER'] + unless ENV['VERSION_QUALIFIER'].to_s.strip.empty? logstash_version_line = original_lines.find {|line| line.match(/^logstash:/) } logstash_version_line.chomp! logstash_version_line << "-#{ENV['VERSION_QUALIFIER']}\n" diff --git a/qa/docker/spec/spec_helper.rb b/qa/docker/spec/spec_helper.rb index b62d4d1f6e0..f79514d5a48 100644 --- a/qa/docker/spec/spec_helper.rb +++ b/qa/docker/spec/spec_helper.rb @@ -13,7 +13,7 @@ def version end def qualified_version - qualifier = ENV['VERSION_QUALIFIER'] + qualifier = ENV['VERSION_QUALIFIER'].to_s.strip.empty? ? nil : ENV['VERSION_QUALIFIER'] qualified_version = qualifier ? [version, qualifier].join("-") : version ENV["RELEASE"] == "1" ? qualified_version : [qualified_version, "SNAPSHOT"].join("-") end diff --git a/rakelib/artifacts.rake b/rakelib/artifacts.rake index dca08a51039..2788debe0a5 100644 --- a/rakelib/artifacts.rake +++ b/rakelib/artifacts.rake @@ -17,7 +17,7 @@ namespace "artifact" do SNAPSHOT_BUILD = ENV["RELEASE"] != "1" - VERSION_QUALIFIER = ENV["VERSION_QUALIFIER"] + VERSION_QUALIFIER = ENV["VERSION_QUALIFIER"].to_s.strip.empty? ? nil : ENV["VERSION_QUALIFIER"] LOCAL_ARTIFACTS = ENV["LOCAL_ARTIFACTS"] || "true" PACKAGE_SUFFIX = SNAPSHOT_BUILD ? "-SNAPSHOT" : "" From 5573b5ad777eef7a4204f2376bd601090d7866bc Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Fri, 7 Feb 2025 21:30:11 +0000 Subject: [PATCH 14/27] fix logstash-keystore to accept spaces in values when added via stdin (#17039) This commit preserves spaces in values, ensuring that multi-word strings are stored as intended. Prior to this change, `logstash-keystore` incorrectly handled values containing spaces, causing only the first word to be stored. --- .../java/org/logstash/secret/cli/Terminal.java | 2 +- qa/integration/specs/secret_store_spec.rb | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/secret/cli/Terminal.java b/logstash-core/src/main/java/org/logstash/secret/cli/Terminal.java index ff0b12b0afc..a27590d4bb7 100644 --- a/logstash-core/src/main/java/org/logstash/secret/cli/Terminal.java +++ b/logstash-core/src/main/java/org/logstash/secret/cli/Terminal.java @@ -81,7 +81,7 @@ public char[] readSecret() { if (useConsole) { return System.console().readPassword(); } else { - return scanner.next().toCharArray(); + return scanner.nextLine().toCharArray(); } } diff --git a/qa/integration/specs/secret_store_spec.rb b/qa/integration/specs/secret_store_spec.rb index 5d5a39d5331..2c677bdcb22 100644 --- a/qa/integration/specs/secret_store_spec.rb +++ b/qa/integration/specs/secret_store_spec.rb @@ -87,6 +87,20 @@ expect(logstash.stderr_and_stdout).to match(/\\"\$\{tag1\}\\"/) end + it "add value that contains a space" do + # add two key value pairs to keystore + # hello => hello world + # bye => bye + key = "hello" + value = "hello world" + @logstash.run_cmd(["bash", "-c", "echo -e '#{value}\\nbye' | LOGSTASH_KEYSTORE_PASS=#{logstash_keystore_passowrd} #{@logstash.logstash_home}/bin/logstash-keystore --path.settings #{settings_dir} add #{key} bye"]) + + test_env["LOGSTASH_KEYSTORE_PASS"] = logstash_keystore_passowrd + logstash = @logstash.run_cmd(["bin/logstash", "-e", "input{ generator{ count => 1 tags => ['${#{key}}', '${bye}'] }}", "--path.settings", settings_dir], true, test_env) + expect(logstash.stderr_and_stdout).to match(/#{value}/) + expect(logstash.stderr_and_stdout).to match(/bye/) + end + context "won't start" do it "with the wrong password when variables are in settings" do test_env["LOGSTASH_KEYSTORE_PASS"] = "WRONG_PASSWRD" From a847ef77646c49dc56233a5cb6a9f1925c1b736a Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Tue, 11 Feb 2025 10:54:21 -0800 Subject: [PATCH 15/27] Update logstash_releases.json (#17055) - 8.18 branch was cut 2025-01-29; add 8.next and shift 8.future - 8.16.4 and 8.17.2 were released 2025-02-11; shift forward --- ci/logstash_releases.json | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ci/logstash_releases.json b/ci/logstash_releases.json index 6b695636968..7c2d8fa076a 100644 --- a/ci/logstash_releases.json +++ b/ci/logstash_releases.json @@ -1,15 +1,15 @@ { "releases": { "7.current": "7.17.27", - "8.previous": "8.16.3", - "8.current": "8.17.1" + "8.previous": "8.16.4", + "8.current": "8.17.2" }, "snapshots": { "7.current": "7.17.28-SNAPSHOT", - "8.previous": "8.16.4-SNAPSHOT", - "8.current": "8.17.2-SNAPSHOT", - "8.next": null, - "8.future": "8.18.0-SNAPSHOT", + "8.previous": "8.16.5-SNAPSHOT", + "8.current": "8.17.3-SNAPSHOT", + "8.next": "8.18.0-SNAPSHOT", + "8.future": "8.19.0-SNAPSHOT", "9.next": "9.0.0-SNAPSHOT", "main": "9.1.0-SNAPSHOT" } From 8cd38499b57fa2de5da2641fbf54c9be4f5f646b Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Wed, 12 Feb 2025 16:17:52 +0200 Subject: [PATCH 16/27] Use centralized source of truth for active branches (#17063) This commit simplifies the DRA process in Logstash by removing the need to maintain a separate file for the active branches, and instead rely on a centrally maintained file containing source of truth. While at it, we refactor/simplify the creation of an array with the versions in `.buildkite/scripts/snyk/resolve_stack_version.sh`. --- .../common/trigger-pipeline-generate-steps.sh | 4 +-- .../scripts/snyk/resolve_stack_version.sh | 8 +++--- ci/branches.json | 26 ------------------- 3 files changed, 6 insertions(+), 32 deletions(-) delete mode 100644 ci/branches.json diff --git a/.buildkite/scripts/common/trigger-pipeline-generate-steps.sh b/.buildkite/scripts/common/trigger-pipeline-generate-steps.sh index e6c7ef240c3..f0dc4263aa2 100755 --- a/.buildkite/scripts/common/trigger-pipeline-generate-steps.sh +++ b/.buildkite/scripts/common/trigger-pipeline-generate-steps.sh @@ -12,7 +12,7 @@ set -eo pipefail # https://github.com/elastic/ingest-dev/issues/2664 # ******************************************************* -ACTIVE_BRANCHES_URL="https://raw.githubusercontent.com/elastic/logstash/main/ci/branches.json" +ACTIVE_BRANCHES_URL="https://storage.googleapis.com/artifacts-api/snapshots/branches.json" EXCLUDE_BRANCHES_ARRAY=() BRANCHES=() @@ -63,7 +63,7 @@ exclude_branches_to_array set -u set +e # pull releaseable branches from $ACTIVE_BRANCHES_URL -readarray -t ELIGIBLE_BRANCHES < <(curl --retry-all-errors --retry 5 --retry-delay 5 -fsSL $ACTIVE_BRANCHES_URL | jq -r '.branches[].branch') +readarray -t ELIGIBLE_BRANCHES < <(curl --retry-all-errors --retry 5 --retry-delay 5 -fsSL $ACTIVE_BRANCHES_URL | jq -r '.branches[]') if [[ $? -ne 0 ]]; then echo "There was an error downloading or parsing the json output from [$ACTIVE_BRANCHES_URL]. Exiting." exit 1 diff --git a/.buildkite/scripts/snyk/resolve_stack_version.sh b/.buildkite/scripts/snyk/resolve_stack_version.sh index 088051b23e8..06da9baa509 100755 --- a/.buildkite/scripts/snyk/resolve_stack_version.sh +++ b/.buildkite/scripts/snyk/resolve_stack_version.sh @@ -6,9 +6,9 @@ set -e -VERSION_URL="https://raw.githubusercontent.com/elastic/logstash/main/ci/branches.json" +VERSION_URL="https://storage.googleapis.com/artifacts-api/snapshots/branches.json" echo "Fetching versions from $VERSION_URL" -VERSIONS=$(curl --silent $VERSION_URL) -TARGET_BRANCHES=$(echo "$VERSIONS" | jq -r '.branches | map(.branch) | join(" ")') -TARGET_BRANCHES=($TARGET_BRANCHES) +readarray -t TARGET_BRANCHES < <(curl --retry-all-errors --retry 5 --retry-delay 5 -fsSL $VERSION_URL | jq -r '.branches[]') +echo "${TARGET_BRANCHES[@]}" + diff --git a/ci/branches.json b/ci/branches.json deleted file mode 100644 index a5afd2f4d0e..00000000000 --- a/ci/branches.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "notice": "This file is not maintained outside of the main branch and should only be used for tooling.", - "branches": [ - { - "branch": "main" - }, - { - "branch": "9.0" - }, - { - "branch": "8.x" - }, - { - "branch": "8.16" - }, - { - "branch": "8.17" - }, - { - "branch": "8.18" - }, - { - "branch": "7.17" - } - ] - } From 78c34465dc7c4809611cd35a4fb742ae436020d4 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Thu, 13 Feb 2025 18:13:17 +0200 Subject: [PATCH 17/27] Allow capturing heap dumps in DRA BK jobs (#17081) This commit allows Buildkite to capture any heap dumps produced during DRA builds. --- .buildkite/scripts/dra/generatesteps.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.buildkite/scripts/dra/generatesteps.py b/.buildkite/scripts/dra/generatesteps.py index d26e34009e3..85891e1b9ac 100644 --- a/.buildkite/scripts/dra/generatesteps.py +++ b/.buildkite/scripts/dra/generatesteps.py @@ -30,6 +30,8 @@ def package_x86_step(branch, workflow_type): export PATH="/opt/buildkite-agent/.rbenv/bin:/opt/buildkite-agent/.pyenv/bin:$PATH" eval "$(rbenv init -)" .buildkite/scripts/dra/build_packages.sh + artifact_paths: + - "**/*.hprof" ''' return step @@ -44,6 +46,8 @@ def package_x86_docker_step(branch, workflow_type): image: family/platform-ingest-logstash-ubuntu-2204 machineType: "n2-standard-16" diskSizeGb: 200 + artifact_paths: + - "**/*.hprof" command: | export WORKFLOW_TYPE="{workflow_type}" export PATH="/opt/buildkite-agent/.rbenv/bin:/opt/buildkite-agent/.pyenv/bin:$PATH" @@ -63,6 +67,8 @@ def package_aarch64_docker_step(branch, workflow_type): imagePrefix: platform-ingest-logstash-ubuntu-2204-aarch64 instanceType: "m6g.4xlarge" diskSizeGb: 200 + artifact_paths: + - "**/*.hprof" command: | export WORKFLOW_TYPE="{workflow_type}" export PATH="/opt/buildkite-agent/.rbenv/bin:/opt/buildkite-agent/.pyenv/bin:$PATH" From d20eb4dbcbf781a39c178d75cb8b70eb9a5a9c56 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Fri, 14 Feb 2025 07:53:52 -0800 Subject: [PATCH 18/27] qa: use clean expansion of LS tarball per fixture instance (#17082) * qa: use clean expansion of LS tarball per fixture instance Because QA tests can _modify_ the Logstash installation (e.g. those that invoke the plugin manager), it is important that the service wrapper begins with a clean expansion of the logstash tarball. * qa: enable safe reuse of ls_home in ls_to_ls tests --- build.gradle | 2 +- qa/integration/framework/settings.rb | 20 +++++++ qa/integration/services/logstash_service.rb | 56 +++++++++++++++---- .../specs/logstash_to_logstash_spec.rb | 22 ++++---- 4 files changed, 78 insertions(+), 22 deletions(-) diff --git a/build.gradle b/build.gradle index d184538cdfc..8696ac4b2f8 100644 --- a/build.gradle +++ b/build.gradle @@ -414,7 +414,7 @@ def qaBuildPath = "${buildDir}/qa/integration" def qaVendorPath = "${qaBuildPath}/vendor" tasks.register("installIntegrationTestGems") { - dependsOn unpackTarDistribution + dependsOn assembleTarDistribution def gemfilePath = file("${projectDir}/qa/integration/Gemfile") inputs.files gemfilePath inputs.files file("${projectDir}/qa/integration/integration_tests.gemspec") diff --git a/qa/integration/framework/settings.rb b/qa/integration/framework/settings.rb index 3492b2b14a8..40adc9117b9 100644 --- a/qa/integration/framework/settings.rb +++ b/qa/integration/framework/settings.rb @@ -16,6 +16,7 @@ # under the License. require 'yaml' +require 'delegate' # All settings for a test, global and per test class TestSettings @@ -76,4 +77,23 @@ def feature_config_dir feature = feature_flag feature.empty? ? nil : File.join(FIXTURES_DIR, feature) end + + def override(kv_map) + Override.new(self, kv_map) + end + + class Override < SimpleDelegator# quacks like TestSettings + def initialize(test_settings, overrides) + super(test_settings) + @overrides = overrides + end + + def is_set?(key) + @overrides.include?(key) || super + end + + def get(key) + @overrides.fetch(key) { super } + end + end end diff --git a/qa/integration/services/logstash_service.rb b/qa/integration/services/logstash_service.rb index d9eacc24bc0..1a915ca334a 100644 --- a/qa/integration/services/logstash_service.rb +++ b/qa/integration/services/logstash_service.rb @@ -53,22 +53,58 @@ def initialize(settings, api_port = 9600) if @settings.is_set?("ls_home_abs_path") @logstash_home = @settings.get("ls_home_abs_path") else - # use the LS which was just built in source repo - ls_version_file = YAML.load_file(LS_VERSION_FILE) - ls_file = "logstash-" + ls_version_file["logstash"] - # First try without the snapshot if it's there - @logstash_home = File.expand_path(File.join(LS_BUILD_DIR, ls_file), __FILE__) - @logstash_home += "-SNAPSHOT" unless Dir.exist?(@logstash_home) - - puts "Using #{@logstash_home} as LS_HOME" - @logstash_bin = File.join("#{@logstash_home}", LS_BIN) - raise "Logstash binary not found in path #{@logstash_home}" unless File.file? @logstash_bin + @logstash_home = clean_expand_built_tarball end + puts "Using #{@logstash_home} as LS_HOME" + @logstash_bin = File.join("#{@logstash_home}", LS_BIN) + raise "Logstash binary not found in path #{@logstash_home}" unless File.file? @logstash_bin + @default_settings_file = File.join(@logstash_home, LS_CONFIG_FILE) @monitoring_api = MonitoringAPI.new(api_port) end + ## + # @return [String] the path to a CLEAN expansion of the locally-built tarball + def clean_expand_built_tarball + build_dir = File.expand_path(LS_BUILD_DIR, __FILE__) # source of tarball + target_dir = File.join(build_dir, "qa-fixture") + + # find the built tarball matching the current version, preferring non-SNAPSHOT + ls_version = YAML.load_file(LS_VERSION_FILE).fetch("logstash") + candidates = %W( + logstash-#{ls_version}.tar.gz + logstash-#{ls_version}-SNAPSHOT.tar.gz + ) + + candidates.each do |tarball_candidate| + tarball_candidate_path = File.join(build_dir, tarball_candidate) + if File.exist?(tarball_candidate_path) + expected_untar_directory = File.basename(tarball_candidate, ".tar.gz") + result_logstash_home = File.join(target_dir, expected_untar_directory) + + if Dir.exist?(result_logstash_home) + puts "expunging(#{result_logstash_home})" + # FileUtils#rm_rf cannot be used here because it silently fails to remove the bundled jdk on MacOS + expunge_result = `rm -rf #{Shellwords.escape(result_logstash_home)} 2>&1` + fail("ERROR EXPUNGING: #{expunge_result}") unless $?.success? + end + + puts "expanding(#{tarball_candidate_path})" + FileUtils.mkdir_p(target_dir) unless Dir.exist?(target_dir) + FileUtils.chdir(target_dir) do + expand_result = `tar -xzf #{Shellwords.escape(tarball_candidate_path)} 2>&1` + fail("ERROR EXPANDING: #{expand_result}") unless $?.success? + end + + return result_logstash_home + end + end + + fail("failed to find any matching build tarballs (looked for `#{candidates}` in `#{build_dir}`)") + end + private :clean_expand_built_tarball + def alive? if @process.nil? || @process.exited? raise "Logstash process is not up because of an error, or it stopped" diff --git a/qa/integration/specs/logstash_to_logstash_spec.rb b/qa/integration/specs/logstash_to_logstash_spec.rb index 5efc09c071c..6bea9db0379 100644 --- a/qa/integration/specs/logstash_to_logstash_spec.rb +++ b/qa/integration/specs/logstash_to_logstash_spec.rb @@ -36,13 +36,6 @@ @fixture.teardown } - def change_logstash_setting(logstash_service, name, value) - settings = {}.tap do |settings| - settings[name] = value - end - IO.write(logstash_service.application_settings_file, settings.to_yaml) - end - def get_temp_path_dir tmp_path = Stud::Temporary.pathname tmp_data_path = File.join(tmp_path, "data") @@ -51,10 +44,17 @@ def get_temp_path_dir end def run_logstash_instance(config_name, options = {}) - api_port = 9600 + rand(1000) - logstash_service = LogstashService.new(@fixture.settings, api_port) - change_logstash_setting(logstash_service, "api.http.port", api_port) - logstash_service.spawn_logstash("-f", config_to_temp_file(@fixture.config(config_name, options)), "--path.data", get_temp_path_dir) + @next_api_port_offset = (@next_api_port_offset||0).next.modulo(1000) # cycle through 1000 possibles + api_port = 9600 + @next_api_port_offset + + # to avoid LogstashService's clean-from-tarball default behaviour, we need + # to tell it where our LOGSTASH_HOME is in the existing service + existing_fixture_logstash_home = @fixture.get_service("logstash").logstash_home + logstash_service = LogstashService.new(@fixture.settings.override("ls_home_abs_path" => existing_fixture_logstash_home), api_port) + + logstash_service.spawn_logstash("--path.config", config_to_temp_file(@fixture.config(config_name, options)), + "--path.data", get_temp_path_dir, + "--api.http.port", api_port.to_s) wait_for_logstash(logstash_service) logstash_service end From e896cd727dda41a0bda9ce6e65c4fb391c3d1b4f Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Mon, 17 Feb 2025 11:08:19 +0000 Subject: [PATCH 19/27] CPM handle 404 response gracefully with user-friendly log (#17052) Starting from es-output 12.0.2, a 404 response is treated as an error. Previously, central pipeline management considered 404 as an empty pipeline, not an error. This commit restores the expected behavior by handling 404 gracefully and logs a user-friendly message. It also removes the redundant cache of pipeline in CPM Fixes: #17035 --- .../config_management/elasticsearch_source.rb | 21 +++++----- .../elasticsearch_source_spec.rb | 39 +++++++++---------- 2 files changed, 28 insertions(+), 32 deletions(-) diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index 6ad08ad116a..0baf448f601 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -59,20 +59,19 @@ def get_pipeline_fetcher(es_version) def pipeline_configs logger.trace("Fetch remote config pipeline", :pipeline_ids => pipeline_ids) - begin - license_check(true) - rescue LogStash::LicenseChecker::LicenseError => e - if @cached_pipelines.nil? - raise e - else - return @cached_pipelines - end - end + license_check(true) es_version = get_es_version fetcher = get_pipeline_fetcher(es_version) - fetcher.fetch_config(es_version, pipeline_ids, client) - @cached_pipelines = fetcher.get_pipeline_ids.collect do |pid| + begin + fetcher.fetch_config(es_version, pipeline_ids, client) + rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + # es-output 12.0.2 throws 404 as error, but we want to handle it as empty config + return [] if e.response_code == 404 + raise e + end + + fetcher.get_pipeline_ids.collect do |pid| get_pipeline(pid, fetcher) end.compact end diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index 285c520a229..b16916ca5e6 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -103,23 +103,7 @@ "status" => 400} } - let(:elasticsearch_8_err_response) { - {"error" => - {"root_cause" => - [{"type" => "index_not_found_exception", - "reason" => "no such index [.logstash]", - "resource.type" => "index_expression", - "resource.id" => ".logstash", - "index_uuid" => "_na_", - "index" => ".logstash"}], - "type" => "index_not_found_exception", - "reason" => "no such index [.logstash]", - "resource.type" => "index_expression", - "resource.id" => ".logstash", - "index_uuid" => "_na_", - "index" => ".logstash"}, - "status" => 404} - } + let(:elasticsearch_8_err_response) { {"error" => "Incorrect HTTP method for uri", "status" => 405} } before do extension.additionals_settings(system_settings) @@ -466,13 +450,13 @@ }] }.to_json end - let(:es_path) { ".logstash/_mget" } + let(:legacy_api) { ".logstash/_mget" } let(:request_body_string) { LogStash::Json.dump({ "docs" => [{ "_id" => pipeline_id }] }) } before do allow(mock_client).to receive(:get).with(system_indices_url_regex).and_return(LogStash::Json.load(elasticsearch_response)) allow(mock_client).to receive(:get).with("/").and_return(es_version_response) - allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) + allow(mock_client).to receive(:post).with(legacy_api, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response) allow(mock_license_client).to receive(:get).with('/').and_return(cluster_info(LOGSTASH_VERSION)) @@ -493,7 +477,7 @@ before :each do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) allow_any_instance_of(described_class).to receive(:logger).and_return(logger_stub) - allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) + allow(mock_client).to receive(:post).with(legacy_api, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_7_9_response)) end let(:config) { "input { generator {} } filter { mutate {} } output { }" } @@ -734,6 +718,7 @@ describe "when exception occur" do let(:elasticsearch_response) { "" } + let(:bad_response_404) { LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(404, "url", "request_body", "response_body") } before do expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client) @@ -747,9 +732,21 @@ it "raises the exception upstream in [7.9]" do allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) - expect(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_raise("Something bad") + expect(mock_client).to receive(:post).with(legacy_api, {}, request_body_string).and_raise("Something bad") expect { subject.pipeline_configs }.to raise_error /Something bad/ end + + it "returns empty pipeline when ES client raise BadResponseCodeError in [8]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_response) + expect(mock_client).to receive(:get).with(system_indices_url_regex).and_raise(bad_response_404) + expect(subject.pipeline_configs).to be_empty + end + + it "returns empty pipeline when ES client raise BadResponseCodeError in [7.9]" do + allow(mock_client).to receive(:get).with("/").and_return(es_version_7_9_response) + expect(mock_client).to receive(:post).with(legacy_api, {}, request_body_string).and_raise(bad_response_404) + expect(subject.pipeline_configs).to be_empty + end end end From 637f447b8850e380b72c0a16b446564a59f0a217 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Mon, 17 Feb 2025 19:01:44 +0000 Subject: [PATCH 20/27] allow concurrent Batch deserialization (#17050) Currently the deserialization is behind the readBatch's lock, so any large batch will take time deserializing, causing any other Queue writer (e.g. netty executor threads) and any other Queue reader (pipeline worker) to block. This commit moves the deserialization out of the lock, allowing multiple pipeline workers to deserialize batches concurrently. - add intermediate batch-holder from `Queue` methods - make the intermediate batch-holder a private inner class of `Queue` with a descriptive name `SerializedBatchHolder` Co-authored-by: Ry Biesemeyer --- .../java/org/logstash/ackedqueue/Queue.java | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index cb3c4e29701..e232c99a5f8 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -590,13 +590,18 @@ public void ensurePersistedUpto(long seqNum) throws IOException{ * @throws IOException if an IO error occurs */ public synchronized Batch nonBlockReadBatch(int limit) throws IOException { + final SerializedBatchHolder serializedBatchHolder; lock.lock(); try { Page p = nextReadPage(); - return (isHeadPage(p) && p.isFullyRead()) ? null : readPageBatch(p, limit, 0L); + if (isHeadPage(p) && p.isFullyRead()) { + return null; + } + serializedBatchHolder = readPageBatch(p, limit, 0L); } finally { lock.unlock(); } + return serializedBatchHolder.deserialize(); } /** @@ -607,7 +612,11 @@ public synchronized Batch nonBlockReadBatch(int limit) throws IOException { * @throws QueueRuntimeException if queue is closed * @throws IOException if an IO error occurs */ - public synchronized Batch readBatch(int limit, long timeout) throws IOException { + public Batch readBatch(int limit, long timeout) throws IOException { + return readSerializedBatch(limit, timeout).deserialize(); + } + + private synchronized SerializedBatchHolder readSerializedBatch(int limit, long timeout) throws IOException { lock.lock(); try { @@ -618,7 +627,7 @@ public synchronized Batch readBatch(int limit, long timeout) throws IOException } /** - * read a {@link Batch} from the given {@link Page}. If the page is a head page, try to maximize the + * read a {@link SerializedBatchHolder} from the given {@link Page}. If the page is a head page, try to maximize the * batch size by waiting for writes. * @param p the {@link Page} to read from. * @param limit size limit of the batch to read. @@ -626,7 +635,7 @@ public synchronized Batch readBatch(int limit, long timeout) throws IOException * @return {@link Batch} with read elements or null if nothing was read * @throws IOException if an IO error occurs */ - private Batch readPageBatch(Page p, int limit, long timeout) throws IOException { + private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) throws IOException { int left = limit; final List elements = new ArrayList<>(limit); @@ -678,7 +687,7 @@ private Batch readPageBatch(Page p, int limit, long timeout) throws IOException removeUnreadPage(p); } - return new Batch(elements, firstSeqNum, this); + return new SerializedBatchHolder(elements, firstSeqNum); } /** @@ -894,4 +903,18 @@ private static boolean containsSeq(final Page page, final long seqNum) { final long pMaxSeq = pMinSeq + (long) page.getElementCount(); return seqNum >= pMinSeq && seqNum < pMaxSeq; } + + class SerializedBatchHolder { + private final List elements; + private final long firstSeqNum; + + private SerializedBatchHolder(List elements, long firstSeqNum) { + this.elements = elements; + this.firstSeqNum = firstSeqNum; + } + + private Batch deserialize() { + return new Batch(elements, firstSeqNum, Queue.this); + } + } } From 9abad6609c21c4e796cb9a97ba782a7a9837f589 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Tue, 18 Feb 2025 21:53:35 -0800 Subject: [PATCH 21/27] spec: improve ls2ls spec (#17114) * spec: improve ls2ls spec - fixes upstream/downstream convention - upstream: the sending logstash (has an LS output) - downstream: the receiving logstash (has an LS input) - helper `run_logstash_instance` yields the `LogstashService` instance and handles the teardown. - pass the pipeline id and node name to the LS instances via command line flags to make logging easier to differentiate - use the generator input's sequence id to ensure that the _actual_ events generated are received by the downstream pipeline * start with port-offset 100 Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> --------- Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> --- .../fixtures/logstash_to_logstash_spec.yml | 2 +- .../specs/logstash_to_logstash_spec.rb | 45 ++++++++++--------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/qa/integration/fixtures/logstash_to_logstash_spec.yml b/qa/integration/fixtures/logstash_to_logstash_spec.yml index 9b47b6b888a..89401a298bf 100644 --- a/qa/integration/fixtures/logstash_to_logstash_spec.yml +++ b/qa/integration/fixtures/logstash_to_logstash_spec.yml @@ -14,7 +14,7 @@ config: file { path => '<%=options[:output_file_path]%>' flush_interval => 0 - codec => line { format => "%{message}" } + codec => line { format => "%{[event][sequence]}:%{message}" } } } basic_ls_output: |- diff --git a/qa/integration/specs/logstash_to_logstash_spec.rb b/qa/integration/specs/logstash_to_logstash_spec.rb index 6bea9db0379..6ea0b89e7f3 100644 --- a/qa/integration/specs/logstash_to_logstash_spec.rb +++ b/qa/integration/specs/logstash_to_logstash_spec.rb @@ -43,8 +43,8 @@ def get_temp_path_dir tmp_data_path end - def run_logstash_instance(config_name, options = {}) - @next_api_port_offset = (@next_api_port_offset||0).next.modulo(1000) # cycle through 1000 possibles + def run_logstash_instance(config_name, options = {}, &block) + @next_api_port_offset = (@next_api_port_offset||100).next.modulo(1000) # cycle through 1000 possibles api_port = 9600 + @next_api_port_offset # to avoid LogstashService's clean-from-tarball default behaviour, we need @@ -52,11 +52,17 @@ def run_logstash_instance(config_name, options = {}) existing_fixture_logstash_home = @fixture.get_service("logstash").logstash_home logstash_service = LogstashService.new(@fixture.settings.override("ls_home_abs_path" => existing_fixture_logstash_home), api_port) - logstash_service.spawn_logstash("--path.config", config_to_temp_file(@fixture.config(config_name, options)), + logstash_service.spawn_logstash("--node.name", config_name, + "--pipeline.id", config_name, + "--path.config", config_to_temp_file(@fixture.config(config_name, options)), "--path.data", get_temp_path_dir, "--api.http.port", api_port.to_s) wait_for_logstash(logstash_service) - logstash_service + + yield logstash_service + + ensure + logstash_service&.teardown end def wait_for_logstash(service) @@ -86,26 +92,25 @@ def wait_for_logstash(service) } it "successfully send events" do - upstream_logstash_service = run_logstash_instance(input_config_name, all_config_options) - downstream_logstash_service = run_logstash_instance(output_config_name, all_config_options) + run_logstash_instance(input_config_name, all_config_options) do |downstream_logstash_service| + run_logstash_instance(output_config_name, all_config_options) do |upstream_logstash_service| - try(num_retries) do - event_stats = upstream_logstash_service.monitoring_api.event_stats - if event_stats - expect(event_stats["in"]).to eq(num_events) - end - end + try(num_retries) do + downstream_event_stats = downstream_logstash_service.monitoring_api.event_stats - upstream_logstash_service.teardown - downstream_logstash_service.teardown + expect(downstream_event_stats).to include({"in" => num_events}), lambda { "expected #{num_events} events to have been received by downstream"} + end - # make sure received events are in the file - file_output_path = File.join(upstream_logstash_service.logstash_home, output_file_path_with_datetime) - expect(File).to exist(file_output_path), "Logstash to Logstash output file: #{file_output_path} does not exist" - count = File.foreach(file_output_path).inject(0) { |c, _| c + 1 } - expect(count).to eq(num_events) + # make sure received events are in the file + file_output_path = File.join(downstream_logstash_service.logstash_home, output_file_path_with_datetime) + expect(File).to exist(file_output_path), "Logstash to Logstash output file: #{file_output_path} does not exist" + actual_lines = File.read(file_output_path).lines.to_a + expected_lines = (0...num_events).map { |sequence| "#{sequence}:Hello world!\n" } + expect(actual_lines).to match_array(expected_lines) - File.delete(file_output_path) + File.delete(file_output_path) + end + end end end From 089558801e1c5bc9a7cea24a10cee69f98046c6a Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 19 Feb 2025 11:17:20 -0800 Subject: [PATCH 22/27] plugins: improve `remove` command to support multiple plugins (#17030) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removal works in a single pass by finding plugins that would have unmet dependencies if all of the specified plugins were to be removed, and proceeding with the removal only if no conflicts were created. > ~~~ > ╭─{ rye@perhaps:~/src/elastic/logstash@main (pluginmanager-remove-multiple ✘) } > ╰─● bin/logstash-plugin remove logstash-input-syslog logstash-filter-grok > Using system java: /Users/rye/.jenv/shims/java > Resolving dependencies...... > Successfully removed logstash-input-syslog > Successfully removed logstash-filter-grok > [success (00:00:05)] ~~~ --- .../bundler/logstash_uninstall.rb | 94 +++++++------ lib/pluginmanager/remove.rb | 61 +++++---- .../fixtures/plugins/generate-gems.sh | 4 + ...lter-four_depends_on_one_and_three.gemspec | 26 ++++ ...ogstash-filter-one_no_dependencies.gemspec | 23 ++++ ...stash-filter-three_no_dependencies.gemspec | 23 ++++ ...logstash-filter-two_depends_on_one.gemspec | 25 ++++ ...gstash-filter-zero_no_dependencies.gemspec | 23 ++++ qa/integration/services/logstash_service.rb | 20 ++- qa/integration/specs/cli/remove_spec.rb | 128 +++++++++++++++++- rakelib/plugin.rake | 4 +- 11 files changed, 349 insertions(+), 82 deletions(-) create mode 100755 qa/integration/fixtures/plugins/generate-gems.sh create mode 100644 qa/integration/fixtures/plugins/logstash-filter-four_depends_on_one_and_three.gemspec create mode 100644 qa/integration/fixtures/plugins/logstash-filter-one_no_dependencies.gemspec create mode 100644 qa/integration/fixtures/plugins/logstash-filter-three_no_dependencies.gemspec create mode 100644 qa/integration/fixtures/plugins/logstash-filter-two_depends_on_one.gemspec create mode 100644 qa/integration/fixtures/plugins/logstash-filter-zero_no_dependencies.gemspec diff --git a/lib/pluginmanager/bundler/logstash_uninstall.rb b/lib/pluginmanager/bundler/logstash_uninstall.rb index 11cfbefbc95..3e32d9ece09 100644 --- a/lib/pluginmanager/bundler/logstash_uninstall.rb +++ b/lib/pluginmanager/bundler/logstash_uninstall.rb @@ -34,54 +34,50 @@ def initialize(gemfile_path, lockfile_path) @lockfile_path = lockfile_path end - # To be uninstalled the candidate gems need to be standalone. - def dependants_gems(gem_name) - builder = Dsl.new - builder.eval_gemfile(::File.join(::File.dirname(gemfile_path), "original gemfile"), File.read(gemfile_path)) - definition = builder.to_definition(lockfile_path, {}) - - definition.specs - .select { |spec| spec.dependencies.collect(&:name).include?(gem_name) } - .collect(&:name).sort.uniq - end - - def uninstall!(gem_name) - unfreeze_gemfile do - dependencies_from = dependants_gems(gem_name) - - if dependencies_from.size > 0 - display_cant_remove_message(gem_name, dependencies_from) - false - else - remove_gem(gem_name) - true + def uninstall!(gems_to_remove) + gems_to_remove = Array(gems_to_remove) + + unsatisfied_dependency_mapping = Dsl.evaluate(gemfile_path, lockfile_path, {}).specs.each_with_object({}) do |spec, memo| + next if gems_to_remove.include?(spec.name) + deps = spec.runtime_dependencies.collect(&:name) + deps.intersection(gems_to_remove).each do |missing_dependency| + memo[missing_dependency] ||= [] + memo[missing_dependency] << spec.name end end - end + if unsatisfied_dependency_mapping.any? + unsatisfied_dependency_mapping.each { |gem_to_remove, gems_depending_on_removed| display_cant_remove_message(gem_to_remove, gems_depending_on_removed) } + LogStash::PluginManager.ui.info("No plugins were removed.") + return false + end + + with_mutable_gemfile do |gemfile| + gems_to_remove.each { |gem_name| gemfile.remove(gem_name) } - def remove_gem(gem_name) - builder = Dsl.new - file = File.new(gemfile_path, "r+") + builder = Dsl.new + builder.eval_gemfile(::File.join(::File.dirname(gemfile_path), "gemfile to changes"), gemfile.generate) - gemfile = LogStash::Gemfile.new(file).load - gemfile.remove(gem_name) - builder.eval_gemfile(::File.join(::File.dirname(gemfile_path), "gemfile to changes"), gemfile.generate) + # build a definition, providing an intentionally-empty "unlock" mapping + # to ensure that all gem versions remain locked + definition = builder.to_definition(lockfile_path, {}) - definition = builder.to_definition(lockfile_path, {}) - definition.lock(lockfile_path) - gemfile.save + # lock the definition and save our modified gemfile + definition.lock(lockfile_path) + gemfile.save + + gems_to_remove.each do |gem_name| + LogStash::PluginManager.ui.info("Successfully removed #{gem_name}") + end - LogStash::PluginManager.ui.info("Successfully removed #{gem_name}") - ensure - file.close if file + return true + end end def display_cant_remove_message(gem_name, dependencies_from) - message = <<-eos -Failed to remove \"#{gem_name}\" because the following plugins or libraries depend on it: - -* #{dependencies_from.join("\n* ")} - eos + message = <<~EOS + Failed to remove \"#{gem_name}\" because the following plugins or libraries depend on it: + * #{dependencies_from.join("\n* ")} + EOS LogStash::PluginManager.ui.info(message) end @@ -93,10 +89,22 @@ def unfreeze_gemfile end end - def self.uninstall!(gem_name, options = { :gemfile => LogStash::Environment::GEMFILE, :lockfile => LogStash::Environment::LOCKFILE }) - gemfile_path = options[:gemfile] - lockfile_path = options[:lockfile] - LogstashUninstall.new(gemfile_path, lockfile_path).uninstall!(gem_name) + ## + # Yields a mutable gemfile backed by an open, writable file handle. + # It is the responsibility of the caller to send `LogStash::Gemfile#save` to persist the result. + # @yieldparam [LogStash::Gemfile] + def with_mutable_gemfile + unfreeze_gemfile do + File.open(gemfile_path, 'r+') do |file| + yield LogStash::Gemfile.new(file).load + end + end + end + + def self.uninstall!(gem_names, options={}) + gemfile_path = options[:gemfile] || LogStash::Environment::GEMFILE + lockfile_path = options[:lockfile] || LogStash::Environment::LOCKFILE + LogstashUninstall.new(gemfile_path, lockfile_path).uninstall!(Array(gem_names)) end end end diff --git a/lib/pluginmanager/remove.rb b/lib/pluginmanager/remove.rb index e6c5d33d4d8..36a8592f326 100644 --- a/lib/pluginmanager/remove.rb +++ b/lib/pluginmanager/remove.rb @@ -20,48 +20,51 @@ require "pluginmanager/command" class LogStash::PluginManager::Remove < LogStash::PluginManager::Command - parameter "PLUGIN", "plugin name" + parameter "PLUGIN ...", "plugin name(s)" def execute + signal_error("One or more plugins must be specified") if plugin_list.empty? signal_error("File #{LogStash::Environment::GEMFILE_PATH} does not exist or is not writable, aborting") unless File.writable?(LogStash::Environment::GEMFILE_PATH) LogStash::Bundler.prepare({:without => [:build, :development]}) - if LogStash::PluginManager::ALIASES.has_key?(plugin) - unless LogStash::PluginManager.installed_plugin?(plugin, gemfile) - aliased_plugin = LogStash::PluginManager::ALIASES[plugin] - puts "Cannot remove the alias #{plugin}, which is an alias for #{aliased_plugin}; if you wish to remove it, you must remove the aliased plugin instead." - return + plugin_list.each do |plugin| + if LogStash::PluginManager::ALIASES.has_key?(plugin) + unless LogStash::PluginManager.installed_plugin?(plugin, gemfile) + aliased_plugin = LogStash::PluginManager::ALIASES[plugin] + puts "Cannot remove the alias #{plugin}, which is an alias for #{aliased_plugin}; if you wish to remove it, you must remove the aliased plugin instead." + return + end end - end - # If a user is attempting to uninstall X-Pack, present helpful output to guide - # them toward the OSS-only distribution of Logstash - LogStash::PluginManager::XPackInterceptor::Remove.intercept!(plugin) + # If a user is attempting to uninstall X-Pack, present helpful output to guide + # them toward the OSS-only distribution of Logstash + LogStash::PluginManager::XPackInterceptor::Remove.intercept!(plugin) - # if the plugin is provided by an integration plugin. abort. - if integration_plugin = LogStash::PluginManager.which_integration_plugin_provides(plugin, gemfile) - signal_error("This plugin is already provided by '#{integration_plugin.name}' so it can't be removed individually") - end + # if the plugin is provided by an integration plugin. abort. + if integration_plugin = LogStash::PluginManager.which_integration_plugin_provides(plugin, gemfile) + signal_error("The plugin `#{plugin}` is provided by '#{integration_plugin.name}' so it can't be removed individually") + end - not_installed_message = "This plugin has not been previously installed" - plugin_gem_spec = LogStash::PluginManager.find_plugins_gem_specs(plugin).any? - if plugin_gem_spec - # make sure this is an installed plugin and present in Gemfile. - # it is not possible to uninstall a dependency not listed in the Gemfile, for example a dependent codec - signal_error(not_installed_message) unless LogStash::PluginManager.installed_plugin?(plugin, gemfile) - else - # locally installed plugins are not recoginized by ::Gem::Specification - # we may ::Bundler.setup to reload but it resets all dependencies so we get error message for future bundler operations - # error message: `Bundler::GemNotFound: Could not find rubocop-1.60.2... in locally installed gems` - # instead we make sure Gemfile has a definition and ::Gem::Specification recognizes local gem - signal_error(not_installed_message) unless !!gemfile.find(plugin) + not_installed_message = "The plugin `#{plugin}` has not been previously installed" + plugin_gem_spec = LogStash::PluginManager.find_plugins_gem_specs(plugin).any? + if plugin_gem_spec + # make sure this is an installed plugin and present in Gemfile. + # it is not possible to uninstall a dependency not listed in the Gemfile, for example a dependent codec + signal_error(not_installed_message) unless LogStash::PluginManager.installed_plugin?(plugin, gemfile) + else + # locally installed plugins are not recoginized by ::Gem::Specification + # we may ::Bundler.setup to reload but it resets all dependencies so we get error message for future bundler operations + # error message: `Bundler::GemNotFound: Could not find rubocop-1.60.2... in locally installed gems` + # instead we make sure Gemfile has a definition and ::Gem::Specification recognizes local gem + signal_error(not_installed_message) unless !!gemfile.find(plugin) - local_gem = gemfile.locally_installed_gems.detect { |local_gem| local_gem.name == plugin } - signal_error(not_installed_message) unless local_gem + local_gem = gemfile.locally_installed_gems.detect { |local_gem| local_gem.name == plugin } + signal_error(not_installed_message) unless local_gem + end end - exit(1) unless ::Bundler::LogstashUninstall.uninstall!(plugin) + exit(1) unless ::Bundler::LogstashUninstall.uninstall!(plugin_list) LogStash::Bundler.genericize_platform remove_unused_locally_installed_gems! rescue => exception diff --git a/qa/integration/fixtures/plugins/generate-gems.sh b/qa/integration/fixtures/plugins/generate-gems.sh new file mode 100755 index 00000000000..2871367b2c3 --- /dev/null +++ b/qa/integration/fixtures/plugins/generate-gems.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env sh + +cd "$( dirname "$0" )" +find . -name '*.gemspec' | xargs -n1 gem build \ No newline at end of file diff --git a/qa/integration/fixtures/plugins/logstash-filter-four_depends_on_one_and_three.gemspec b/qa/integration/fixtures/plugins/logstash-filter-four_depends_on_one_and_three.gemspec new file mode 100644 index 00000000000..a2b36dfb747 --- /dev/null +++ b/qa/integration/fixtures/plugins/logstash-filter-four_depends_on_one_and_three.gemspec @@ -0,0 +1,26 @@ +Gem::Specification.new do |s| + s.name = File.basename(__FILE__, ".gemspec") + s.version = '0.1.1' + s.licenses = ['Apache-2.0'] + s.summary = "A dummy plugin with two plugin dependencies" + s.description = "This plugin is only used in the acceptance test" + s.authors = ["Elasticsearch"] + s.email = 'info@elasticsearch.com' + s.homepage = "http://www.elasticsearch.org/guide/en/logstash/current/index.html" + s.require_paths = ["lib"] + + # Files + s.files = [__FILE__] + + # Tests + s.test_files = [] + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" } + + # Gem dependencies + s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" + + s.add_runtime_dependency "logstash-filter-one_no_dependencies", "~> 0.1" + s.add_runtime_dependency "logstash-filter-three_no_dependencies", "~> 0.1" +end diff --git a/qa/integration/fixtures/plugins/logstash-filter-one_no_dependencies.gemspec b/qa/integration/fixtures/plugins/logstash-filter-one_no_dependencies.gemspec new file mode 100644 index 00000000000..edc854a29f1 --- /dev/null +++ b/qa/integration/fixtures/plugins/logstash-filter-one_no_dependencies.gemspec @@ -0,0 +1,23 @@ +Gem::Specification.new do |s| + s.name = File.basename(__FILE__, ".gemspec") + s.version = '0.1.1' + s.licenses = ['Apache-2.0'] + s.summary = "A dummy plugin with no dependencies" + s.description = "This plugin is only used in the acceptance test" + s.authors = ["Elasticsearch"] + s.email = 'info@elasticsearch.com' + s.homepage = "http://www.elasticsearch.org/guide/en/logstash/current/index.html" + s.require_paths = ["lib"] + + # Files + s.files = [__FILE__] + + # Tests + s.test_files = [] + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" } + + # Gem dependencies + s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" +end diff --git a/qa/integration/fixtures/plugins/logstash-filter-three_no_dependencies.gemspec b/qa/integration/fixtures/plugins/logstash-filter-three_no_dependencies.gemspec new file mode 100644 index 00000000000..edc854a29f1 --- /dev/null +++ b/qa/integration/fixtures/plugins/logstash-filter-three_no_dependencies.gemspec @@ -0,0 +1,23 @@ +Gem::Specification.new do |s| + s.name = File.basename(__FILE__, ".gemspec") + s.version = '0.1.1' + s.licenses = ['Apache-2.0'] + s.summary = "A dummy plugin with no dependencies" + s.description = "This plugin is only used in the acceptance test" + s.authors = ["Elasticsearch"] + s.email = 'info@elasticsearch.com' + s.homepage = "http://www.elasticsearch.org/guide/en/logstash/current/index.html" + s.require_paths = ["lib"] + + # Files + s.files = [__FILE__] + + # Tests + s.test_files = [] + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" } + + # Gem dependencies + s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" +end diff --git a/qa/integration/fixtures/plugins/logstash-filter-two_depends_on_one.gemspec b/qa/integration/fixtures/plugins/logstash-filter-two_depends_on_one.gemspec new file mode 100644 index 00000000000..e0c724ede31 --- /dev/null +++ b/qa/integration/fixtures/plugins/logstash-filter-two_depends_on_one.gemspec @@ -0,0 +1,25 @@ +Gem::Specification.new do |s| + s.name = File.basename(__FILE__, ".gemspec") + s.version = '0.1.1' + s.licenses = ['Apache-2.0'] + s.summary = "A dummy plugin with one plugin dependency" + s.description = "This plugin is only used in the acceptance test" + s.authors = ["Elasticsearch"] + s.email = 'info@elasticsearch.com' + s.homepage = "http://www.elasticsearch.org/guide/en/logstash/current/index.html" + s.require_paths = ["lib"] + + # Files + s.files = [__FILE__] + + # Tests + s.test_files = [] + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" } + + # Gem dependencies + s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" + + s.add_runtime_dependency "logstash-filter-one_no_dependencies", "~> 0.1" +end diff --git a/qa/integration/fixtures/plugins/logstash-filter-zero_no_dependencies.gemspec b/qa/integration/fixtures/plugins/logstash-filter-zero_no_dependencies.gemspec new file mode 100644 index 00000000000..edc854a29f1 --- /dev/null +++ b/qa/integration/fixtures/plugins/logstash-filter-zero_no_dependencies.gemspec @@ -0,0 +1,23 @@ +Gem::Specification.new do |s| + s.name = File.basename(__FILE__, ".gemspec") + s.version = '0.1.1' + s.licenses = ['Apache-2.0'] + s.summary = "A dummy plugin with no dependencies" + s.description = "This plugin is only used in the acceptance test" + s.authors = ["Elasticsearch"] + s.email = 'info@elasticsearch.com' + s.homepage = "http://www.elasticsearch.org/guide/en/logstash/current/index.html" + s.require_paths = ["lib"] + + # Files + s.files = [__FILE__] + + # Tests + s.test_files = [] + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" } + + # Gem dependencies + s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" +end diff --git a/qa/integration/services/logstash_service.rb b/qa/integration/services/logstash_service.rb index 1a915ca334a..b4881807a8a 100644 --- a/qa/integration/services/logstash_service.rb +++ b/qa/integration/services/logstash_service.rb @@ -20,6 +20,7 @@ require "childprocess" require "bundler" require "socket" +require "shellwords" require "tempfile" require 'yaml' @@ -337,8 +338,9 @@ def initialize(logstash_service) @logstash_plugin = File.join(@logstash.logstash_home, LOGSTASH_PLUGIN) end - def remove(plugin_name) - run("remove #{plugin_name}") + def remove(plugin_name, *additional_plugins) + plugin_list = Shellwords.shelljoin([plugin_name]+additional_plugins) + run("remove #{plugin_list}") end def prepare_offline_pack(plugins, output_zip = nil) @@ -351,12 +353,16 @@ def prepare_offline_pack(plugins, output_zip = nil) end end - def list(plugin_name, verbose = false) - run("list #{plugin_name} #{verbose ? "--verbose" : ""}") + def list(*plugins, verbose: false) + command = "list" + command << " --verbose" if verbose + command << " #{Shellwords.shelljoin(plugins)}" if plugins.any? + run(command) end - def install(plugin_name) - run("install #{plugin_name}") + def install(plugin_name, *additional_plugins) + plugin_list = ([plugin_name]+additional_plugins).flatten + run("install #{Shellwords.shelljoin(plugin_list)}") end def run(command) @@ -364,7 +370,7 @@ def run(command) end def run_raw(cmd, change_dir = true, environment = {}) - @logstash.run_cmd(cmd.split(' '), change_dir, environment) + @logstash.run_cmd(Shellwords.shellsplit(cmd), change_dir, environment) end end end diff --git a/qa/integration/specs/cli/remove_spec.rb b/qa/integration/specs/cli/remove_spec.rb index 91aba8cf74a..efc64f1be27 100644 --- a/qa/integration/specs/cli/remove_spec.rb +++ b/qa/integration/specs/cli/remove_spec.rb @@ -22,7 +22,7 @@ require "logstash/devutils/rspec/spec_helper" describe "CLI > logstash-plugin remove" do - before(:all) do + before(:each) do @fixture = Fixture.new(__FILE__) @logstash_plugin = @fixture.get_service("logstash").plugin_cli end @@ -109,5 +109,131 @@ expect(presence_check.stderr_and_stdout).to match(/logstash-codec-json/) end end + + context "multiple plugins" do + + let(:setup_plugin_list) do + fail("spec must override `setup_plugin_list`") + end + + before(:each) do + if setup_plugin_list.any? + search_dir = File.expand_path(File.join(__dir__, "..", "..", "fixtures", "plugins")) + plugin_paths = [] + + aggregate_failures('setup: resolve plugin paths') do + setup_plugin_list.each do |requested_plugin| + found = Dir.glob(File.join(search_dir, "#{requested_plugin}-*.gem")) + expect(found).to have_attributes(:size => 1), lambda { "expected exactly one `#{requested_plugin}` in `#{search_dir}`, got #{found.inspect}" } + plugin_paths << found.first + end + end + + aggregate_failures('setup: installing plugins') do + puts "installing plugins #{plugin_paths.inspect}" + outcome = @logstash_plugin.install(*plugin_paths) + + expect(outcome.exit_code).to eq(0) + expect(outcome.stderr_and_stdout).to match(/Installation successful/) + end + end + end + + context "when a remaining plugin has a dependency on a removed plugin" do + let(:setup_plugin_list) do + %w( + logstash-filter-zero_no_dependencies + logstash-filter-one_no_dependencies + logstash-filter-two_depends_on_one + logstash-filter-three_no_dependencies + logstash-filter-four_depends_on_one_and_three + ) + end + it "errors helpfully without removing any of the plugins" do + execute = @logstash_plugin.remove("logstash-filter-three_no_dependencies", "logstash-filter-zero_no_dependencies") + + expect(execute.exit_code).to eq(1) + expect(execute.stderr_and_stdout).to include('Failed to remove "logstash-filter-three_no_dependencies"') + expect(execute.stderr_and_stdout).to include("* logstash-filter-four_depends_on_one_and_three") # one of the dependency + expect(execute.stderr_and_stdout).to include("No plugins were removed.") + + aggregate_failures("list plugins") do + presence_check = @logstash_plugin.list + expect(presence_check.exit_code).to eq(0) + expect(presence_check.stderr_and_stdout).to include('logstash-filter-three_no_dependencies') + expect(presence_check.stderr_and_stdout).to include('logstash-filter-zero_no_dependencies') + end + end + end + context "when multiple remaining plugins have a dependency on a removed plugin" do + let(:setup_plugin_list) do + %w( + logstash-filter-zero_no_dependencies + logstash-filter-one_no_dependencies + logstash-filter-two_depends_on_one + logstash-filter-three_no_dependencies + logstash-filter-four_depends_on_one_and_three + ) + end + it "errors helpfully without removing any of the plugins" do + execute = @logstash_plugin.remove("logstash-filter-one_no_dependencies", "logstash-filter-zero_no_dependencies") + + expect(execute.exit_code).to eq(1) + expect(execute.stderr_and_stdout).to include('Failed to remove "logstash-filter-one_no_dependencies"') + expect(execute.stderr_and_stdout).to include("* logstash-filter-four_depends_on_one_and_three") # one of the dependency + expect(execute.stderr_and_stdout).to include("* logstash-filter-two_depends_on_one") # one of the dependency + expect(execute.stderr_and_stdout).to include("No plugins were removed.") + + aggregate_failures("list plugins") do + presence_check = @logstash_plugin.list + expect(presence_check.exit_code).to eq(0) + expect(presence_check.stderr_and_stdout).to include('logstash-filter-one_no_dependencies') + expect(presence_check.stderr_and_stdout).to include('logstash-filter-zero_no_dependencies') + end + end + end + context "when removing plugins and all plugins that depend on them" do + let(:setup_plugin_list) do + %w( + logstash-filter-zero_no_dependencies + logstash-filter-one_no_dependencies + logstash-filter-two_depends_on_one + logstash-filter-three_no_dependencies + logstash-filter-four_depends_on_one_and_three + ) + end + it "removes the plugins" do + plugins_to_remove = %w( + logstash-filter-one_no_dependencies + logstash-filter-two_depends_on_one + logstash-filter-three_no_dependencies + logstash-filter-four_depends_on_one_and_three + ).shuffle #random order + execute = @logstash_plugin.remove(*plugins_to_remove) + + aggregate_failures("removal action") do + expect(execute).to have_attributes(:exit_code => 0, :stderr_and_stdout => include("Success")) + plugins_to_remove.each do |gem_name| + expect(execute.stderr_and_stdout).to include("Successfully removed #{gem_name}") + end + end + + aggregate_failures("list plugins") do + presence_check = @logstash_plugin.list + expect(presence_check.exit_code).to eq(0) + aggregate_failures("removed plugins") do + plugins_to_remove.each do |expected_removed_plugin| + expect(presence_check.stderr_and_stdout).to_not include(expected_removed_plugin) + end + end + aggregate_failures("non-removed plugins") do + (setup_plugin_list - plugins_to_remove).each do |expected_remaining_plugin| + expect(presence_check.stderr_and_stdout).to include(expected_remaining_plugin) + end + end + end + end + end + end end end diff --git a/rakelib/plugin.rake b/rakelib/plugin.rake index ffd7a1cbf80..a41c8fa1e90 100644 --- a/rakelib/plugin.rake +++ b/rakelib/plugin.rake @@ -24,9 +24,9 @@ namespace "plugin" do LogStash::PluginManager::Main.run("bin/logstash-plugin", ["install"] + args) end - def remove_plugin(plugin) + def remove_plugin(plugin, *more_plugins) require_relative "../lib/pluginmanager/main" - LogStash::PluginManager::Main.run("bin/logstash-plugin", ["remove", plugin]) + LogStash::PluginManager::Main.run("bin/logstash-plugin", ["remove", plugin] + more_plugins) end task "install-base" => "bootstrap" do From e094054c0e99382aa3e2c666cbd2f1046c1e1da2 Mon Sep 17 00:00:00 2001 From: Cas Donoghue Date: Wed, 19 Feb 2025 15:44:29 -0800 Subject: [PATCH 23/27] Fix acceptance test assertions for updated plugin `remove` (#17122) This commit updates the acceptance tests to expect messages in the updated format for removing plugins. See https://github.com/elastic/logstash/pull/17030 for change. --- .../shared_examples/cli/logstash-plugin/integration_plugin.rb | 2 +- .../spec/shared_examples/cli/logstash-plugin/uninstall.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/qa/acceptance/spec/shared_examples/cli/logstash-plugin/integration_plugin.rb b/qa/acceptance/spec/shared_examples/cli/logstash-plugin/integration_plugin.rb index c37d8652fb5..02afb1e3ed0 100644 --- a/qa/acceptance/spec/shared_examples/cli/logstash-plugin/integration_plugin.rb +++ b/qa/acceptance/spec/shared_examples/cli/logstash-plugin/integration_plugin.rb @@ -75,7 +75,7 @@ context "trying to uninstall an inner plugin" do it "fails to uninstall it" do result = logstash.run_sudo_command_in_path("bin/logstash-plugin uninstall logstash-input-rabbitmq") - expect(result.stderr).to match(/is already provided by/) + expect(result.stderr).to include("The plugin `logstash-input-rabbitmq` is provided by 'logstash-integration-rabbitmq' so it can't be removed individually") end end end diff --git a/qa/acceptance/spec/shared_examples/cli/logstash-plugin/uninstall.rb b/qa/acceptance/spec/shared_examples/cli/logstash-plugin/uninstall.rb index ea379008f49..d485c03ca8a 100644 --- a/qa/acceptance/spec/shared_examples/cli/logstash-plugin/uninstall.rb +++ b/qa/acceptance/spec/shared_examples/cli/logstash-plugin/uninstall.rb @@ -33,7 +33,7 @@ context "when the plugin isn't installed" do it "fails to uninstall it" do result = logstash.run_sudo_command_in_path("bin/logstash-plugin uninstall logstash-filter-qatest") - expect(result.stderr).to match(/This plugin has not been previously installed/) + expect(result.stderr).to include("The plugin `logstash-filter-qatest` has not been previously installed") end end From e8e24a03972339e03f88278bbf530b63065fa269 Mon Sep 17 00:00:00 2001 From: Cas Donoghue Date: Thu, 20 Feb 2025 07:19:36 -0800 Subject: [PATCH 24/27] Fix acceptance test assertions for updated plugin remove (#17126) This commit updates the acceptance tests to expect messages in the updated format for removing plugins. See https://github.com/elastic/logstash/pull/17030 for change. --- .../spec/shared_examples/cli/logstash-plugin/remove.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/acceptance/spec/shared_examples/cli/logstash-plugin/remove.rb b/qa/acceptance/spec/shared_examples/cli/logstash-plugin/remove.rb index aeae0857f7f..ac3c6bd1b3a 100644 --- a/qa/acceptance/spec/shared_examples/cli/logstash-plugin/remove.rb +++ b/qa/acceptance/spec/shared_examples/cli/logstash-plugin/remove.rb @@ -33,7 +33,7 @@ context "when the plugin isn't installed" do it "fails to remove it" do result = logstash.run_sudo_command_in_path("bin/logstash-plugin remove logstash-filter-qatest") - expect(result.stderr).to match(/This plugin has not been previously installed/) + expect(result.stderr).to include("The plugin `logstash-filter-qatest` has not been previously installed") end end From 91258c3f9860c02f5df5857ca7e97d56fd3183a2 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 20 Feb 2025 12:55:40 -0800 Subject: [PATCH 25/27] entrypoint: avoid polluting stdout (#17125) routes output from setup-related functions to stderr, so that stdout can include only the output of the actual program. --- bin/logstash.bat | 2 +- bin/logstash.lib.sh | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/logstash.bat b/bin/logstash.bat index 44fff6f6007..7015e856815 100644 --- a/bin/logstash.bat +++ b/bin/logstash.bat @@ -6,7 +6,7 @@ set params='%*' if "%1" == "-V" goto version if "%1" == "--version" goto version -call "%~dp0setup.bat" || exit /b 1 +1>&2 (call "%~dp0setup.bat") || exit /b 1 if errorlevel 1 ( if not defined nopauseonerror ( pause diff --git a/bin/logstash.lib.sh b/bin/logstash.lib.sh index 3c4da9070f5..388e9650d0a 100755 --- a/bin/logstash.lib.sh +++ b/bin/logstash.lib.sh @@ -186,8 +186,8 @@ setup_vendored_jruby() { } setup() { - setup_java - setup_vendored_jruby + >&2 setup_java + >&2 setup_vendored_jruby } ruby_exec() { From 227c0d815025c2dcdbc1fd068906cb7a8258027b Mon Sep 17 00:00:00 2001 From: Cas Donoghue Date: Fri, 21 Feb 2025 10:40:43 -0800 Subject: [PATCH 26/27] Update container acceptance tests with stdout/stderr changes (#17138) In https://github.com/elastic/logstash/pull/17125 jvm setup was redirected to stderr to avoid polluting stdout. This test was actually having to do some additional processing to parse that information. Now that we have split the destinations the tests can be simplified to look for the data they are trying to validate on the appropriate stream. --- qa/docker/shared_examples/container.rb | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/qa/docker/shared_examples/container.rb b/qa/docker/shared_examples/container.rb index 72ae9a5c598..a6290e6ec01 100644 --- a/qa/docker/shared_examples/container.rb +++ b/qa/docker/shared_examples/container.rb @@ -15,16 +15,13 @@ it 'should run with the correct version' do console_out = exec_in_container(@container, 'logstash --version') - console_filtered = console_out.split("\n") - .delete_if do |line| - line =~ /Using LS_JAVA_HOME defined java|Using system java: / - end.join - expect(console_filtered).to match /#{version}/ + expect(console_out).to match /#{version}/ end it 'should run with the bundled JDK' do - first_console_line = exec_in_container(@container, 'logstash --version').split("\n")[0] - expect(first_console_line).to match /Using bundled JDK: \/usr\/share\/logstash\/jdk/ + full_command = exec_in_container_full(@container, 'logstash --version') + std_err = full_command[:stderr].join.chomp.strip + expect(std_err).to match /Using bundled JDK: \/usr\/share\/logstash\/jdk/ end it 'should be running an API server on port 9600' do From 4d52b7258db2dfc53631bc995504aef439a2bd82 Mon Sep 17 00:00:00 2001 From: Dimitrios Liappis Date: Mon, 24 Feb 2025 15:29:35 +0200 Subject: [PATCH 27/27] Add Windows 2025 to CI (#17133) This commit adds Windows 2025 to the Windows JDK matrix and exhaustive tests pipelines. --- .buildkite/scripts/common/vm-images.json | 2 +- .buildkite/windows_jdk_matrix_pipeline.yml | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.buildkite/scripts/common/vm-images.json b/.buildkite/scripts/common/vm-images.json index 5e34672b85f..bd1f0d3ed42 100644 --- a/.buildkite/scripts/common/vm-images.json +++ b/.buildkite/scripts/common/vm-images.json @@ -9,5 +9,5 @@ "amazonlinux": ["amazonlinux-2023"], "opensuse": ["opensuse-leap-15"] }, - "windows": ["windows-2022", "windows-2019", "windows-2016"] + "windows": ["windows-2025", "windows-2022", "windows-2019", "windows-2016"] } diff --git a/.buildkite/windows_jdk_matrix_pipeline.yml b/.buildkite/windows_jdk_matrix_pipeline.yml index 0f75f1675ff..a1578c5ea1a 100644 --- a/.buildkite/windows_jdk_matrix_pipeline.yml +++ b/.buildkite/windows_jdk_matrix_pipeline.yml @@ -15,6 +15,8 @@ steps: multiple: true default: "${DEFAULT_MATRIX_OS}" options: + - label: "Windows 2025" + value: "windows-2025" - label: "Windows 2022" value: "windows-2022" - label: "Windows 2019"