Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add LZ4 frame compression
Browse files Browse the repository at this point in the history
findepi committed Apr 12, 2022
1 parent 02ade1b commit 9103d8e
Showing 9 changed files with 766 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -107,7 +107,7 @@
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
<scope>test</scope>
<!-- TODO move to test scope once we have XxHash32 -->
</dependency>

<dependency>
137 changes: 137 additions & 0 deletions src/main/java/io/airlift/compress/lz4/Lz4FrameCompressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.lz4;

import io.airlift.compress.Compressor;

import java.nio.Buffer;
import java.nio.ByteBuffer;

import static io.airlift.compress.lz4.Lz4RawCompressor.MAX_TABLE_SIZE;
import static io.airlift.compress.lz4.UnsafeUtil.getAddress;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;

/**
* This class is not thread-safe
*/
public class Lz4FrameCompressor
implements Compressor
{
private final int[] table = new int[MAX_TABLE_SIZE];

@Override
public int maxCompressedLength(int uncompressedSize)
{
return Lz4FrameRawCompressor.maxCompressedLength(uncompressedSize);
}

@Override
public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
{
verifyRange(input, inputOffset, inputLength);
verifyRange(output, outputOffset, maxOutputLength);

long inputAddress = ARRAY_BYTE_BASE_OFFSET + inputOffset;
long outputAddress = ARRAY_BYTE_BASE_OFFSET + outputOffset;

return Lz4FrameRawCompressor.compress(
input,
inputAddress,
inputLength,
output,
outputAddress,
maxOutputLength,
table);
}

@Override
public void compress(ByteBuffer inputBuffer, ByteBuffer outputBuffer)
{
if (true) {
// TODO support byte buffers, see disabled tests
throw new UnsupportedOperationException("This is disabled, does not work with direct buffers yet");
}

// Java 9+ added an overload of various methods in ByteBuffer. When compiling with Java 11+ and targeting Java 8 bytecode
// the resulting signatures are invalid for JDK 8, so accesses below result in NoSuchMethodError. Accessing the
// methods through the interface class works around the problem
// Sidenote: we can't target "javac --release 8" because Unsafe is not available in the signature data for that profile
Buffer input = inputBuffer;
Buffer output = outputBuffer;

Object inputBase;
long inputAddress;
int inputLimit;
if (input.isDirect()) {
inputBase = null;
long address = getAddress(input);
inputAddress = address + input.position();
inputLimit = input.limit();
}
else if (input.hasArray()) {
inputBase = input.array();
inputAddress = ARRAY_BYTE_BASE_OFFSET + input.arrayOffset() + input.position();
inputLimit = input.limit();
}
else {
throw new IllegalArgumentException("Unsupported input ByteBuffer implementation " + input.getClass().getName());
}

Object outputBase;
long outputAddress;
int outputLimit;
if (output.isDirect()) {
outputBase = null;
long address = getAddress(output);
outputAddress = address + output.position();
outputLimit = output.limit();
}
else if (output.hasArray()) {
outputBase = output.array();
outputAddress = ARRAY_BYTE_BASE_OFFSET + output.arrayOffset() + output.position();
outputLimit = output.limit();
}
else {
throw new IllegalArgumentException("Unsupported output ByteBuffer implementation " + output.getClass().getName());
}

// HACK: Assure JVM does not collect Slice wrappers while compressing, since the
// collection may trigger freeing of the underlying memory resulting in a segfault
// There is no other known way to signal to the JVM that an object should not be
// collected in a block, and technically, the JVM is allowed to eliminate these locks.
synchronized (input) {
synchronized (output) {
int written = Lz4FrameRawCompressor.compress(
inputBase,
inputAddress,
inputLimit,
outputBase,
outputAddress,
outputLimit,
table);
output.position(output.position() + written);
}
}
}

private static void verifyRange(byte[] data, int offset, int length)
{
requireNonNull(data, "data is null");
if (offset < 0 || length < 0 || offset + length > data.length) {
throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length));
}
}
}
123 changes: 123 additions & 0 deletions src/main/java/io/airlift/compress/lz4/Lz4FrameDecompressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.lz4;

import io.airlift.compress.Decompressor;
import io.airlift.compress.MalformedInputException;

import java.nio.Buffer;
import java.nio.ByteBuffer;

import static io.airlift.compress.lz4.UnsafeUtil.getAddress;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;

public class Lz4FrameDecompressor
implements Decompressor
{
@Override
public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
throws MalformedInputException
{
verifyRange(input, inputOffset, inputLength);
verifyRange(output, outputOffset, maxOutputLength);

return Lz4FrameRawDecompressor.decompress(
input,
ARRAY_BYTE_BASE_OFFSET + inputOffset,
inputLength,
output,
ARRAY_BYTE_BASE_OFFSET + outputOffset,
maxOutputLength);
}

@Override
public void decompress(ByteBuffer inputBuffer, ByteBuffer outputBuffer)
throws MalformedInputException
{
if (true) {
// TODO support byte buffers, see disabled tests
throw new UnsupportedOperationException("This is disabled, does not work with direct buffers yet");
}

// Java 9+ added an overload of various methods in ByteBuffer. When compiling with Java 11+ and targeting Java 8 bytecode
// the resulting signatures are invalid for JDK 8, so accesses below result in NoSuchMethodError. Accessing the
// methods through the interface class works around the problem
// Sidenote: we can't target "javac --release 8" because Unsafe is not available in the signature data for that profile
Buffer input = inputBuffer;
Buffer output = outputBuffer;

Object inputBase;
long inputAddress;
int inputLimit;
if (input.isDirect()) {
inputBase = null;
long address = getAddress(input);
inputAddress = address + input.position();
inputLimit = input.limit();
}
else if (input.hasArray()) {
inputBase = input.array();
inputAddress = ARRAY_BYTE_BASE_OFFSET + input.arrayOffset() + input.position();
inputLimit = input.limit();
}
else {
throw new IllegalArgumentException("Unsupported input ByteBuffer implementation " + input.getClass().getName());
}

Object outputBase;
long outputAddress;
int outputLimit;
if (output.isDirect()) {
outputBase = null;
long address = getAddress(output);
outputAddress = address + output.position();
outputLimit = output.limit();
}
else if (output.hasArray()) {
outputBase = output.array();
outputAddress = ARRAY_BYTE_BASE_OFFSET + output.arrayOffset() + output.position();
outputLimit = output.limit();
}
else {
throw new IllegalArgumentException("Unsupported output ByteBuffer implementation " + output.getClass().getName());
}

// HACK: Assure JVM does not collect Slice wrappers while decompressing, since the
// collection may trigger freeing of the underlying memory resulting in a segfault
// There is no other known way to signal to the JVM that an object should not be
// collected in a block, and technically, the JVM is allowed to eliminate these locks.
synchronized (input) {
synchronized (output) {
int written = Lz4FrameRawDecompressor.decompress(inputBase, inputAddress, inputLimit, outputBase, outputAddress, outputLimit);
output.position(output.position() + written);
}
}
}

public static long getDecompressedSize(byte[] input, int offset, int length)
{
int baseAddress = ARRAY_BYTE_BASE_OFFSET + offset;
return Lz4FrameRawDecompressor.getDecompressedSize(input, baseAddress, length);
}

private static void verifyRange(byte[] data, int offset, int length)
{
requireNonNull(data, "data is null");
if (offset < 0 || length < 0 || offset + length > data.length) {
throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length));
}
}
}
134 changes: 134 additions & 0 deletions src/main/java/io/airlift/compress/lz4/Lz4FrameRawCompressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.lz4;

import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;

import static io.airlift.compress.lz4.UnsafeUtil.UNSAFE;
import static java.lang.Math.toIntExact;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;

/**
* Implementation of <a href="https://github.com/lz4/lz4/blob/3e99d07bc09e1b82ee6191527bb3e555052c55ac/doc/lz4_Frame_format.md">LZ4 Frame format</a>.
*/
final class Lz4FrameRawCompressor
{
private Lz4FrameRawCompressor() {}

private static final byte[] MAGIC = {0x04, 0x22, 0x4D, 0x18};

private static final int FRAME_DESCRIPTOR_SIZE =
2 + // FLG byte, BD byte
8 + // content size
1; // HC (Header Checksum)

private static final int FRAME_START_SIZE = 4 /* magic */ + FRAME_DESCRIPTOR_SIZE;
private static final int FRAME_END_SIZE = 4 /* EndMark */;

private static final int BLOCK_MAX_SIZE = 4 * 1024 * 1024;
private static final int BLOCK_MAX_SIZE_MARKER = 7; // per "Block Maximum Size" spec, 7 means 4 MB

public static int maxCompressedLength(int uncompressedSize)
{
return FRAME_START_SIZE + Lz4RawCompressor.maxCompressedLength(uncompressedSize) + FRAME_END_SIZE +
// block sizes
4 * (uncompressedSize / BLOCK_MAX_SIZE + 2);
}

public static int compress(
Object inputBase,
long inputAddress,
int inputLength,
Object outputBase,
long outputAddress,
int maxOutputLength,
int[] table)
{
long originalOutputAddress = outputAddress;

if (maxOutputLength < maxCompressedLength(inputLength)) {
throw new IllegalArgumentException("Max output length must be larger than " + maxCompressedLength(inputLength));
}

UNSAFE.copyMemory(MAGIC, ARRAY_BYTE_BASE_OFFSET, outputBase, outputAddress, MAGIC.length);
outputAddress += MAGIC.length;
maxOutputLength -= MAGIC.length;

byte[] frameDescriptor = new byte[FRAME_DESCRIPTOR_SIZE];

// FLG byte
frameDescriptor[0] =
0b01 << 6 | // Version: 1
1 << 5 | // B.Indep: blocks are independent
0 << 4 | // B.Checksum: no checksum
1 << 3 | // C.Size: content size present
0 << 2 | // C.Checksum: no checksum
0 << 1 | // Reserved
0 << 0; // DictID: no dictionary

// BD byte
frameDescriptor[1] = (BLOCK_MAX_SIZE_MARKER << 4);

// content size
UNSAFE.putLong(frameDescriptor, ARRAY_BYTE_BASE_OFFSET + 2L, inputLength);

// HC (Header Checksum)
XXHash32 xxHash32 = XXHashFactory.fastestInstance().hash32();
byte hc = (byte) ((xxHash32.hash(frameDescriptor, 0, frameDescriptor.length - 1, 0) >> 8) & 0xFF);
frameDescriptor[frameDescriptor.length - 1] = hc;
UNSAFE.copyMemory(frameDescriptor, ARRAY_BYTE_BASE_OFFSET, outputBase, outputAddress, frameDescriptor.length);
outputAddress += frameDescriptor.length;
maxOutputLength -= frameDescriptor.length;

while (inputLength > 0) {
int blockSize = Math.min(inputLength, BLOCK_MAX_SIZE);
int blockHeaderSize = 4;
int compressedSize = Lz4RawCompressor.compress(
inputBase,
inputAddress,
blockSize,
outputBase,
outputAddress + blockHeaderSize,
maxOutputLength - blockHeaderSize,
table);
int uncompressed;
if (compressedSize >= blockSize) {
// incompressible data
uncompressed = 1;
compressedSize = blockSize;
UNSAFE.copyMemory(inputBase, inputAddress, outputBase, outputAddress + blockHeaderSize, blockSize);
UNSAFE.putInt(outputBase, outputAddress, (1 << 31) | blockSize);
}
else {
// compressed data, already written to the output
uncompressed = 0;
}

UNSAFE.putInt(outputBase, outputAddress, (uncompressed << 31) | compressedSize);
outputAddress += blockHeaderSize + compressedSize;
maxOutputLength -= blockHeaderSize + compressedSize;

inputAddress += blockSize;
inputLength -= blockSize;
}

// EndMark
UNSAFE.putInt(outputBase, outputAddress, 0);
outputAddress += 4;
maxOutputLength -= 4;

return toIntExact(outputAddress - originalOutputAddress);
}
}
146 changes: 146 additions & 0 deletions src/main/java/io/airlift/compress/lz4/Lz4FrameRawDecompressor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.lz4;

import static io.airlift.compress.lz4.UnsafeUtil.UNSAFE;
import static java.lang.Math.toIntExact;

final class Lz4FrameRawDecompressor
{
private Lz4FrameRawDecompressor() {}

private static final int SUPPORTED_FLAGS = 0b01101000;
private static final int SUPPORTED_BD = 0b01110000;

static int decompress(
Object inputBase,
long inputAddress,
int inputLimit,
Object outputBase,
long outputAddress,
int outputLimit)
{
long originalOutputAddress = outputAddress;

checkArgument(inputLimit >= 6, "Not enough input bytes");
checkArgument(UNSAFE.getInt(inputBase, inputAddress) == 0x184D2204, "Invalid magic number");
inputAddress += 4;
inputLimit -= 4;

byte flags = UNSAFE.getByte(inputBase, inputAddress);
byte bd = UNSAFE.getByte(inputBase, inputAddress + 1);
inputAddress += 2;
inputLimit -= 2;

checkArgument(getVersion(flags) == 1, "Unsupported version");
long contentSize = -1;
if (hasContentSize(flags)) {
contentSize = UNSAFE.getLong(inputBase, inputAddress);
inputAddress += 8;
inputLimit -= 8;
checkArgument(contentSize <= outputLimit, "Output buffer too small");
}

checkArgument(inputLimit >= 1, "Not enough input bytes");
byte hc = UNSAFE.getByte(inputBase, inputAddress); // header checksum
inputAddress += 1;
inputLimit -= 1;

checkArgument((flags & ~SUPPORTED_FLAGS) == 0 && areBlocksIndependent(flags), "Unsupported flags");
checkArgument((bd & ~SUPPORTED_BD) == 0, "Invalid BD byte");

while (true) {
checkArgument(inputLimit >= 4, "Not enough input bytes");
int blockSize = UNSAFE.getInt(inputBase, inputAddress);
inputAddress += 4;
inputLimit -= 4;

if (blockSize == 0) {
// EndMark
break;
}

if ((blockSize & (1 << 31)) != 0) {
// uncompressed
blockSize = blockSize & Integer.MAX_VALUE;
checkArgument(inputLimit >= blockSize, "Not enough input bytes");
checkArgument(outputLimit >= blockSize, "Output buffer too small");
UNSAFE.copyMemory(inputBase, inputAddress, outputBase, outputAddress, blockSize);
inputAddress += blockSize;
inputLimit -= blockSize;
outputAddress += blockSize;
outputLimit -= blockSize;
}
else {
// compressed
checkArgument(inputLimit >= blockSize, "Not enough input bytes");
int decompressed = Lz4RawDecompressor.decompress(
inputBase,
inputAddress,
inputAddress + blockSize,
outputBase,
outputAddress,
outputAddress + outputLimit);
inputAddress += blockSize;
inputLimit -= blockSize;
outputAddress += decompressed;
outputLimit -= decompressed;
}
}

checkArgument(inputLimit == 0, "Some input not consumed");
int decompressed = toIntExact(outputAddress - originalOutputAddress);
checkArgument(contentSize == -1 || decompressed == contentSize, "Decompressed wrong number of bytes");
return decompressed;
}

public static long getDecompressedSize(Object inputBase, long inputAddress, int inputLimit)
{
checkArgument(inputLimit >= 6, "Not enough input bytes");
checkArgument(UNSAFE.getInt(inputBase, inputAddress) == 0x184D2204, "Invalid magic number");
inputAddress += 4;
inputLimit -= 4;

byte flags = UNSAFE.getByte(inputBase, inputAddress);
// BD byte not read
inputAddress += 2;
inputLimit -= 2;
checkArgument(hasContentSize(flags), "Content size (C.Size) not present");

checkArgument(inputLimit >= 8, "Not enough input bytes");
return UNSAFE.getLong(inputBase, inputAddress);
}

private static int getVersion(byte flags)
{
return flags >> 6;
}

private static boolean hasContentSize(byte flags)
{
return (flags & (1 << 3)) != 0;
}

private static boolean areBlocksIndependent(byte flags)
{
return (flags & (1 << 5)) != 0;
}

private static void checkArgument(boolean condition, String message)
{
if (!condition) {
throw new IllegalArgumentException(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ public final class Lz4RawCompressor
private static final int RUN_BITS = 8 - ML_BITS;
private static final int RUN_MASK = (1 << RUN_BITS) - 1;

private static final int MAX_DISTANCE = ((1 << 16) - 1);
static final int MAX_DISTANCE = ((1 << 16) - 1);

private static final int SKIP_TRIGGER = 6; /* Increase this value ==> compression run slower on incompressible data */

56 changes: 56 additions & 0 deletions src/test/java/io/airlift/compress/lz4/TestLz4Frame.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.lz4;

import io.airlift.compress.AbstractTestCompression;
import io.airlift.compress.Compressor;
import io.airlift.compress.Decompressor;
import io.airlift.compress.thirdparty.JPountzLz4FrameCompressor;
import io.airlift.compress.thirdparty.JPountzLz4FrameDecompressor;
import net.jpountz.lz4.LZ4Factory;

public class TestLz4Frame
extends AbstractTestCompression
{
@Override
protected Compressor getCompressor()
{
return new Lz4FrameCompressor();
}

@Override
protected Decompressor getDecompressor()
{
return new Lz4FrameDecompressor();
}

@Override
protected boolean isByteBufferSupported()
{
// TODO support byte buffer
return false;
}

@Override
protected Compressor getVerifyCompressor()
{
return new JPountzLz4FrameCompressor(LZ4Factory.fastestInstance());
}

@Override
protected Decompressor getVerifyDecompressor()
{
return new JPountzLz4FrameDecompressor(LZ4Factory.fastestInstance());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.thirdparty;

import io.airlift.compress.Compressor;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FrameOutputStream;
import net.jpountz.xxhash.XXHashFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;

import static java.lang.String.format;

public class JPountzLz4FrameCompressor
implements Compressor
{
private final LZ4Compressor compressor;

public JPountzLz4FrameCompressor(LZ4Factory factory)
{
compressor = factory.fastCompressor();
}

@Override
public int maxCompressedLength(int uncompressedSize)
{
int maxHeaderLength;
try {
Field maxHeaderLengthField = LZ4FrameOutputStream.class.getDeclaredField("LZ4_MAX_HEADER_LENGTH");
maxHeaderLengthField.setAccessible(true);
maxHeaderLength = maxHeaderLengthField.getInt(null);
}
catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
return maxHeaderLength + compressor.maxCompressedLength(uncompressedSize);
}

@Override
public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
{
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
try (LZ4FrameOutputStream compressingOutputStream = new LZ4FrameOutputStream(
outputStream,
LZ4FrameOutputStream.BLOCKSIZE.SIZE_64KB,
inputLength,
compressor,
XXHashFactory.fastestInstance().hash32(),
LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE,
LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE)) {
compressingOutputStream.write(input, inputOffset, inputLength);
}
byte[] compressed = outputStream.toByteArray();
if (compressed.length > maxOutputLength) {
throw new IllegalArgumentException(format("Output buffer too small, provided capacity %s, compressed data size %s", maxOutputLength, compressed.length));
}
System.arraycopy(compressed, 0, output, outputOffset, compressed.length);
return compressed.length;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void compress(ByteBuffer input, ByteBuffer output)
{
if (false) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
try (LZ4FrameOutputStream compressingOutputStream = new LZ4FrameOutputStream(
outputStream,
LZ4FrameOutputStream.BLOCKSIZE.SIZE_64KB,
input.remaining(),
compressor,
XXHashFactory.fastestInstance().hash32());
WritableByteChannel compressingChannel = Channels.newChannel(compressingOutputStream)) {
compressingChannel.write(input);
}
byte[] compressed = outputStream.toByteArray();
if (compressed.length > output.remaining()) {
throw new IllegalArgumentException(format("Output buffer too small, provided capacity %s, compressed data size %s", output.remaining(), compressed.length));
}
output.put(compressed);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.compress.thirdparty;

import io.airlift.compress.Decompressor;
import io.airlift.compress.MalformedInputException;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FrameInputStream;
import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.xxhash.XXHashFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;

import static com.google.common.io.ByteStreams.read;

public class JPountzLz4FrameDecompressor
implements Decompressor
{
private final LZ4SafeDecompressor decompressor;

public JPountzLz4FrameDecompressor(LZ4Factory factory)
{
decompressor = factory.safeDecompressor();
}

@Override
public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
throws MalformedInputException
{
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(input, inputOffset, inputLength);
LZ4FrameInputStream decompressingInputStream = new LZ4FrameInputStream(inputStream, decompressor, XXHashFactory.fastestInstance().hash32())) {
return read(decompressingInputStream, output, outputOffset, maxOutputLength);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void decompress(ByteBuffer input, ByteBuffer output)
throws MalformedInputException
{
throw new UnsupportedOperationException();
}
}

0 comments on commit 9103d8e

Please sign in to comment.