From d2ca40a8f9a1f343371fdd4d611683e346ea2df7 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Wed, 5 Feb 2025 10:25:08 +0100 Subject: [PATCH] Fix BufferedTokenizer to properly resume after a buffer full condition respecting the encoding of the input string (#16968) Permit to use effectively the tokenizer also in context where a line is bigger than a limit. Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens. ## How solve the problem This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694. The first try failed on returning the tokens in the same encoding of the input. This PR does a couple of things: - accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one. - respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset. (cherry picked from commit 1c8cf546c20517f5aba706ebdf177c1b9c2bd9d6) --- .../logstash/common/BufferedTokenizerExt.java | 101 +++++++++-- .../common/BufferedTokenizerExtTest.java | 161 ++++++++++++++++++ ...BufferedTokenizerExtWithDelimiterTest.java | 66 +++++++ ...BufferedTokenizerExtWithSizeLimitTest.java | 111 ++++++++++++ 4 files changed, 426 insertions(+), 13 deletions(-) create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index be1c64d2356..e2c476520c1 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -23,14 +23,18 @@ import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyClass; +import org.jruby.RubyEncoding; import org.jruby.RubyObject; import org.jruby.RubyString; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.util.ByteList; import org.logstash.RubyUtil; +import java.nio.charset.Charset; + @JRubyClass(name = "BufferedTokenizer") public class BufferedTokenizerExt extends RubyObject { @@ -40,10 +44,13 @@ public class BufferedTokenizerExt extends RubyObject { freeze(RubyUtil.RUBY.getCurrentContext()); private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); + private StringBuilder headToken = new StringBuilder(); private RubyString delimiter = NEW_LINE; private int sizeLimit; private boolean hasSizeLimit; private int inputSize; + private boolean bufferFullErrorNotified = false; + private String encodingName; public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -80,23 +87,76 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { @JRubyMethod @SuppressWarnings("rawtypes") public RubyArray extract(final ThreadContext context, IRubyObject data) { + RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context); + encodingName = encoding.getEncoding().getCharsetName(); final RubyArray entities = data.convertToString().split(delimiter, -1); + if (!bufferFullErrorNotified) { + input.clear(); + input.concat(entities); + } else { + // after a full buffer signal + if (input.isEmpty()) { + // after a buffer full error, the remaining part of the line, till next delimiter, + // has to be consumed, unless the input buffer doesn't still contain fragments of + // subsequent tokens. + entities.shift(context); + input.concat(entities); + } else { + // merge last of the input with first of incoming data segment + if (!entities.isEmpty()) { + RubyString last = ((RubyString) input.pop(context)); + RubyString nextFirst = ((RubyString) entities.shift(context)); + entities.unshift(last.concat(nextFirst)); + input.concat(entities); + } + } + } + if (hasSizeLimit) { - final int entitiesSize = ((RubyString) entities.first()).size(); + if (bufferFullErrorNotified) { + bufferFullErrorNotified = false; + if (input.isEmpty()) { + return RubyUtil.RUBY.newArray(); + } + } + final int entitiesSize = ((RubyString) input.first()).size(); if (inputSize + entitiesSize > sizeLimit) { - throw new IllegalStateException("input buffer full"); + bufferFullErrorNotified = true; + headToken = new StringBuilder(); + String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); + inputSize = 0; + input.shift(context); // consume the token fragment that generates the buffer full + throw new IllegalStateException(errorMessage); } this.inputSize = inputSize + entitiesSize; } - input.append(entities.shift(context)); - if (entities.isEmpty()) { + + if (input.getLength() < 2) { + // this is a specialization case which avoid adding and removing from input accumulator + // when it contains just one element + headToken.append(input.shift(context)); // remove head return RubyUtil.RUBY.newArray(); } - entities.unshift(input.join(context)); - input.clear(); - input.append(entities.pop(context)); - inputSize = ((RubyString) input.first()).size(); - return entities; + + if (headToken.length() > 0) { + // if there is a pending token part, merge it with the first token segment present + // in the accumulator, and clean the pending token part. + headToken.append(input.shift(context)); // append buffer to first element and + // create new RubyString with the data specified encoding + RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); + input.unshift(encodedHeadToken); // reinsert it into the array + headToken = new StringBuilder(); + } + headToken.append(input.pop(context)); // put the leftovers in headToken for later + inputSize = headToken.length(); + return input; + } + + private RubyString toEncodedRubyString(ThreadContext context, String input) { + // Depends on the encodingName being set by the extract method, could potentially raise if not set. + RubyString result = RubyUtil.RUBY.newString(new ByteList(input.getBytes(Charset.forName(encodingName)))); + result.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + return result; } /** @@ -108,15 +168,30 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { */ @JRubyMethod public IRubyObject flush(final ThreadContext context) { - final IRubyObject buffer = input.join(context); - input.clear(); + final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); + headToken = new StringBuilder(); inputSize = 0; - return buffer; + + // create new RubyString with the last data specified encoding, if exists + RubyString encodedHeadToken; + if (encodingName != null) { + encodedHeadToken = toEncodedRubyString(context, buffer.toString()); + } else { + // When used with TCP input it could be that on socket connection the flush method + // is invoked while no invocation of extract, leaving the encoding name unassigned. + // In such case also the headToken must be empty + if (!buffer.toString().isEmpty()) { + throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); + } + encodedHeadToken = (RubyString) buffer; + } + + return encodedHeadToken; } @JRubyMethod(name = "empty?") public IRubyObject isEmpty(final ThreadContext context) { - return RubyUtil.RUBY.newBoolean(input.isEmpty() && (inputSize == 0)); + return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java new file mode 100644 index 00000000000..524abb36ed5 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyEncoding; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeASingleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); + + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldMergeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); + assertEquals(List.of("foobar"), tokens); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldTokenizeEmptyPayloadWithNewline() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n")); + assertEquals(List.of(""), tokens); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); + assertEquals(List.of("", "", ""), tokens); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioning() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + sut.extract(context, rubyInput); + IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) + .force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, capitalAInLatin1); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£A", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£", firstToken.toString()); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("A", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void givenDirectFlushInvocationUTF8EncodingIsApplied() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("UTF-8", encoding.toString()); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java new file mode 100644 index 00000000000..19872e66c3c --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); + + assertEquals(List.of("foo", "b|r"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); + assertEquals(List.of("foo"), tokens); + } +} diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java new file mode 100644 index 00000000000..9a07242369d --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.*; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)}; + sut.init(context, args); + } + + @Test + public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + } + + @Test + public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\nanother")); + assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); + } + + @Test + public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc")); + assertEquals(List.of("bbbb"), tokens); + } + + @Test + public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + //first buffer full on 13 "a" letters + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + // second buffer full on 11 "b" letters + Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc")); + }); + assertThat(secondThrownException.getMessage(), containsString("input buffer full")); + + // now should resemble processing on c and d + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n")); + assertEquals(List.of("ccccc", "ddd"), tokens); + } +} \ No newline at end of file