-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix BufferedTokenizer to properly resume after a buffer full condition respecting the encoding of the input string #16968
Changes from all commits
2382fee
339d645
0a910a2
33dc6b1
674d377
5af3a62
0f76bd0
a5d6bef
b42ca05
015ba43
727e3b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,14 +23,18 @@ | |
import org.jruby.Ruby; | ||
import org.jruby.RubyArray; | ||
import org.jruby.RubyClass; | ||
import org.jruby.RubyEncoding; | ||
import org.jruby.RubyObject; | ||
import org.jruby.RubyString; | ||
import org.jruby.anno.JRubyClass; | ||
import org.jruby.anno.JRubyMethod; | ||
import org.jruby.runtime.ThreadContext; | ||
import org.jruby.runtime.builtin.IRubyObject; | ||
import org.jruby.util.ByteList; | ||
import org.logstash.RubyUtil; | ||
|
||
import java.nio.charset.Charset; | ||
|
||
@JRubyClass(name = "BufferedTokenizer") | ||
public class BufferedTokenizerExt extends RubyObject { | ||
|
||
|
@@ -40,10 +44,13 @@ public class BufferedTokenizerExt extends RubyObject { | |
freeze(RubyUtil.RUBY.getCurrentContext()); | ||
|
||
private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); | ||
private StringBuilder headToken = new StringBuilder(); | ||
private RubyString delimiter = NEW_LINE; | ||
private int sizeLimit; | ||
private boolean hasSizeLimit; | ||
private int inputSize; | ||
private boolean bufferFullErrorNotified = false; | ||
private String encodingName; | ||
|
||
public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { | ||
super(runtime, metaClass); | ||
|
@@ -80,23 +87,76 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { | |
@JRubyMethod | ||
@SuppressWarnings("rawtypes") | ||
public RubyArray extract(final ThreadContext context, IRubyObject data) { | ||
RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context); | ||
encodingName = encoding.getEncoding().getCharsetName(); | ||
final RubyArray entities = data.convertToString().split(delimiter, -1); | ||
if (!bufferFullErrorNotified) { | ||
input.clear(); | ||
input.concat(entities); | ||
} else { | ||
// after a full buffer signal | ||
if (input.isEmpty()) { | ||
// after a buffer full error, the remaining part of the line, till next delimiter, | ||
// has to be consumed, unless the input buffer doesn't still contain fragments of | ||
// subsequent tokens. | ||
entities.shift(context); | ||
input.concat(entities); | ||
} else { | ||
// merge last of the input with first of incoming data segment | ||
if (!entities.isEmpty()) { | ||
RubyString last = ((RubyString) input.pop(context)); | ||
RubyString nextFirst = ((RubyString) entities.shift(context)); | ||
entities.unshift(last.concat(nextFirst)); | ||
input.concat(entities); | ||
} | ||
} | ||
} | ||
|
||
if (hasSizeLimit) { | ||
final int entitiesSize = ((RubyString) entities.first()).size(); | ||
if (bufferFullErrorNotified) { | ||
bufferFullErrorNotified = false; | ||
if (input.isEmpty()) { | ||
return RubyUtil.RUBY.newArray(); | ||
} | ||
} | ||
final int entitiesSize = ((RubyString) input.first()).size(); | ||
if (inputSize + entitiesSize > sizeLimit) { | ||
throw new IllegalStateException("input buffer full"); | ||
bufferFullErrorNotified = true; | ||
headToken = new StringBuilder(); | ||
String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); | ||
inputSize = 0; | ||
input.shift(context); // consume the token fragment that generates the buffer full | ||
throw new IllegalStateException(errorMessage); | ||
} | ||
this.inputSize = inputSize + entitiesSize; | ||
} | ||
input.append(entities.shift(context)); | ||
if (entities.isEmpty()) { | ||
|
||
if (input.getLength() < 2) { | ||
// this is a specialization case which avoid adding and removing from input accumulator | ||
// when it contains just one element | ||
headToken.append(input.shift(context)); // remove head | ||
return RubyUtil.RUBY.newArray(); | ||
} | ||
entities.unshift(input.join(context)); | ||
input.clear(); | ||
input.append(entities.pop(context)); | ||
inputSize = ((RubyString) input.first()).size(); | ||
return entities; | ||
|
||
if (headToken.length() > 0) { | ||
// if there is a pending token part, merge it with the first token segment present | ||
// in the accumulator, and clean the pending token part. | ||
headToken.append(input.shift(context)); // append buffer to first element and | ||
// create new RubyString with the data specified encoding | ||
RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); | ||
input.unshift(encodedHeadToken); // reinsert it into the array | ||
headToken = new StringBuilder(); | ||
} | ||
headToken.append(input.pop(context)); // put the leftovers in headToken for later | ||
inputSize = headToken.length(); | ||
return input; | ||
} | ||
|
||
private RubyString toEncodedRubyString(ThreadContext context, String input) { | ||
// Depends on the encodingName being set by the extract method, could potentially raise if not set. | ||
RubyString result = RubyUtil.RUBY.newString(new ByteList(input.getBytes(Charset.forName(encodingName)))); | ||
result.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); | ||
return result; | ||
} | ||
|
||
/** | ||
|
@@ -108,15 +168,30 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { | |
*/ | ||
@JRubyMethod | ||
public IRubyObject flush(final ThreadContext context) { | ||
final IRubyObject buffer = input.join(context); | ||
input.clear(); | ||
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); | ||
headToken = new StringBuilder(); | ||
inputSize = 0; | ||
return buffer; | ||
|
||
// create new RubyString with the last data specified encoding, if exists | ||
RubyString encodedHeadToken; | ||
if (encodingName != null) { | ||
encodedHeadToken = toEncodedRubyString(context, buffer.toString()); | ||
} else { | ||
// When used with TCP input it could be that on socket connection the flush method | ||
// is invoked while no invocation of extract, leaving the encoding name unassigned. | ||
// In such case also the headToken must be empty | ||
if (!buffer.toString().isEmpty()) { | ||
throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there additional context we can add to the exception? Maybe the buffer contents? I would also remove the Is this code path likely There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code path should never be executed. If we execute it means that the following conditions are both true:
Regarding the 1, every RubyString has an encoding assigned, in worst case is defaulted to UTF8, so if we pass at line https://github.com/elastic/logstash/pull/16968/files#diff-5c7f8990e98f54782395d29b4b1b5b68cf6f782b34af5eb8f1b5a77331e0172eR86 we have an encoding. |
||
} | ||
encodedHeadToken = (RubyString) buffer; | ||
} | ||
|
||
return encodedHeadToken; | ||
} | ||
|
||
@JRubyMethod(name = "empty?") | ||
public IRubyObject isEmpty(final ThreadContext context) { | ||
return RubyUtil.RUBY.newBoolean(input.isEmpty() && (inputSize == 0)); | ||
return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Licensed to Elasticsearch B.V. under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch B.V. licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.logstash.common; | ||
|
||
import org.jruby.RubyArray; | ||
import org.jruby.RubyEncoding; | ||
import org.jruby.RubyString; | ||
import org.jruby.runtime.ThreadContext; | ||
import org.jruby.runtime.builtin.IRubyObject; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.logstash.RubyTestBase; | ||
import org.logstash.RubyUtil; | ||
|
||
import java.util.List; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.logstash.RubyUtil.RUBY; | ||
|
||
@SuppressWarnings("unchecked") | ||
public final class BufferedTokenizerExtTest extends RubyTestBase { | ||
|
||
private BufferedTokenizerExt sut; | ||
private ThreadContext context; | ||
|
||
@Before | ||
public void setUp() { | ||
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); | ||
context = RUBY.getCurrentContext(); | ||
IRubyObject[] args = {}; | ||
sut.init(context, args); | ||
} | ||
|
||
@Test | ||
public void shouldTokenizeASingleToken() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); | ||
|
||
assertEquals(List.of("foo"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldMergeMultipleToken() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo")); | ||
assertTrue(tokens.isEmpty()); | ||
|
||
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); | ||
assertEquals(List.of("foobar"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldTokenizeMultipleToken() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); | ||
|
||
assertEquals(List.of("foo", "bar"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldIgnoreEmptyPayload() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("")); | ||
assertTrue(tokens.isEmpty()); | ||
|
||
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); | ||
assertEquals(List.of("foo"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldTokenizeEmptyPayloadWithNewline() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n")); | ||
assertEquals(List.of(""), tokens); | ||
|
||
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); | ||
assertEquals(List.of("", "", ""), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldNotChangeEncodingOfTokensAfterPartitioning() { | ||
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A | ||
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, rubyInput); | ||
|
||
// read the first token, the £ string | ||
IRubyObject firstToken = tokens.shift(context); | ||
assertEquals("£", firstToken.toString()); | ||
|
||
// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion | ||
RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); | ||
assertEquals("ISO-8859-1", encoding.toString()); | ||
} | ||
|
||
@Test | ||
public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { | ||
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character | ||
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); | ||
sut.extract(context, rubyInput); | ||
IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) | ||
.force_encoding(context, RUBY.newString("ISO8859-1")); | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, capitalAInLatin1); | ||
assertTrue(tokens.isEmpty()); | ||
|
||
tokens = (RubyArray<RubyString>)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); | ||
|
||
// read the first token, the £ string | ||
IRubyObject firstToken = tokens.shift(context); | ||
assertEquals("£A", firstToken.toString()); | ||
|
||
// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion | ||
RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); | ||
assertEquals("ISO-8859-1", encoding.toString()); | ||
} | ||
|
||
@Test | ||
public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { | ||
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A | ||
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, rubyInput); | ||
|
||
// read the first token, the £ string | ||
IRubyObject firstToken = tokens.shift(context); | ||
assertEquals("£", firstToken.toString()); | ||
|
||
// flush and check that the remaining A is still encoded in ISO8859-1 | ||
IRubyObject lastToken = sut.flush(context); | ||
assertEquals("A", lastToken.toString()); | ||
|
||
// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion | ||
RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); | ||
assertEquals("ISO-8859-1", encoding.toString()); | ||
} | ||
|
||
@Test | ||
public void givenDirectFlushInvocationUTF8EncodingIsApplied() { | ||
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A | ||
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); | ||
|
||
// flush and check that the remaining A is still encoded in ISO8859-1 | ||
IRubyObject lastToken = sut.flush(context); | ||
assertEquals("", lastToken.toString()); | ||
|
||
// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion | ||
RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); | ||
assertEquals("UTF-8", encoding.toString()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Licensed to Elasticsearch B.V. under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch B.V. licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.logstash.common; | ||
|
||
import org.jruby.RubyArray; | ||
import org.jruby.RubyString; | ||
import org.jruby.runtime.ThreadContext; | ||
import org.jruby.runtime.builtin.IRubyObject; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.logstash.RubyTestBase; | ||
import org.logstash.RubyUtil; | ||
|
||
import java.util.List; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.logstash.RubyUtil.RUBY; | ||
|
||
@SuppressWarnings("unchecked") | ||
public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { | ||
|
||
private BufferedTokenizerExt sut; | ||
private ThreadContext context; | ||
|
||
@Before | ||
public void setUp() { | ||
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); | ||
context = RUBY.getCurrentContext(); | ||
IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; | ||
sut.init(context, args); | ||
} | ||
|
||
@Test | ||
public void shouldTokenizeMultipleToken() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); | ||
|
||
assertEquals(List.of("foo", "b|r"), tokens); | ||
} | ||
|
||
@Test | ||
public void shouldIgnoreEmptyPayload() { | ||
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("")); | ||
assertTrue(tokens.isEmpty()); | ||
|
||
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); | ||
assertEquals(List.of("foo"), tokens); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah shift... AFAIK it is O(N) operation, I wonder if there is improvement we can do...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you refer to
unshift
, in the existing code we always invokeso we shouldn't have introduced any bottleneck that wasn't present in previous version.