Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Variable page size for batch PQ writes #9508

Open
wants to merge 2 commits into
base: queue-timeouts-plus-batches
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,24 @@ private void addTailPage(Page page) throws IOException {
page.getPageIO().deactivate();
}

/**
* create a new empty headpage of the default size
* @param pageNum
* @throws IOException
*/
private void newCheckpointedHeadpage(int pageNum) throws IOException {
newCheckpointedHeadpage(pageNum, 0);
}

/**
* create a new empty headpage for the given pageNum and immediately checkpoint it
*
* @param pageNum the page number of the new head page
* @throws IOException
*/
private void newCheckpointedHeadpage(int pageNum) throws IOException {
PageIO headPageIO = new MmapPageIO(pageNum, this.pageCapacity, this.dirPath);
private void newCheckpointedHeadpage(int pageNum, int minimumDataCapacity) throws IOException {
PageIO headPageIO = MmapPageIO.makeSuitablySizedPage(pageNum, minimumDataCapacity, pageCapacity, this.dirPath);

headPageIO.create();
this.headPage = PageFactory.newHeadPage(pageNum, this, headPageIO);
this.headPage.forceCheckpoint();
Expand Down Expand Up @@ -386,22 +396,12 @@ public long write(List<Queueable> queueables, long timeoutMillis) throws IOExcep
// element at risk in the always-full queue state. In the later, when closing a full queue, it would be impossible
// to write the current element.

final int headPageCapacity = this.headPage.getPageIO().getCapacity();
// Our batch might span multiple pages, so we have to break it up if its too big

final int persistedBatchSize = this.headPage.getPageIO().persistedByteCount(dataSize, serializedQueueables.size());

if (persistedBatchSize <= headPageCapacity) {


try {
return unsafeQueueWrite(serializedQueueables, timeoutMillis, persistedBatchSize);
} finally {
lock.unlock();
}
} else {
// Batch is larger than a page, we can't do an atomic write
throw new IOException("data to be written is bigger than page capacity");
try {
return unsafeQueueWrite(serializedQueueables, timeoutMillis, persistedBatchSize);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -457,8 +457,7 @@ private long unsafeQueueWrite(List<byte[]> serializedQueueables, long timeoutMil
behead();
}

// create new head page
newCheckpointedHeadpage(newHeadPageNum);
newCheckpointedHeadpage(newHeadPageNum, totalPersistedSize);
}

long minWrittenSeqNum = this.seqNum + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.BufferOverflowException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.CRC32;
import org.apache.logging.log4j.LogManager;
Expand All @@ -27,6 +26,7 @@ public final class MmapPageIO implements PageIO {
public static final int MIN_CAPACITY = VERSION_SIZE + SEQNUM_SIZE + LENGTH_SIZE + 1 + CHECKSUM_SIZE; // header overhead plus elements overhead to hold a single 1 byte element
public static final int HEADER_SIZE = 1; // version byte
public static final boolean VERIFY_CHECKSUM = true;
public static final int METADATA_SIZE = SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE;

private static final Logger LOGGER = LogManager.getLogger(MmapPageIO.class);

Expand All @@ -41,6 +41,7 @@ public final class MmapPageIO implements PageIO {
private final CRC32 checkSummer;

private final IntVector offsetMap;
private final int dataCapacity;

private int capacity; // page capacity is an int per the ByteBuffer class.
private long minSeqNum; // TODO: to make minSeqNum final we have to pass in the minSeqNum in the constructor and not set it on first write
Expand All @@ -50,17 +51,53 @@ public final class MmapPageIO implements PageIO {

private MappedByteBuffer buffer;

public MmapPageIO(int pageNum, int capacity, Path dirPath) {
/**
* Constructor that sets the page num, the minimum size of the file, and its path
* If the file on disk already exists, and is larger than the minimum capacity, that size
* will be used instead.
* @param pageNum
* @param minimumFileSize
* @param dirPath
*/
public MmapPageIO(int pageNum, int minimumFileSize, Path dirPath) {
this.minSeqNum = 0;
this.elementCount = 0;
this.version = 0;
this.head = 0;
this.capacity = capacity;
this.offsetMap = new IntVector();
this.checkSummer = new CRC32();
this.file = dirPath.resolve("page." + pageNum).toFile();
int existingFileSize = (int) file.length();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc this is somewhat optimistic I think. We should do a safe cast here (using Guava's Ints for example) here and hard fail if the length exceeds 2GB because we can't mmap that kind of file.
I know what we won't be able to write that kind of file in the first place, so this shouldn't ever come up but with the amount of weird bugs we ran into I think we should have visibility on that kind of problem if it occurs :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to handling the edge cases.

int existingFileCapacity = existingFileSize-1;
// Account for the version byte at the head of the file
// If this file has been created to a larger capacity than our default, use that
if (existingFileSize > minimumFileSize) {
this.capacity = existingFileSize;
this.dataCapacity = existingFileCapacity;
} else {
this.capacity = minimumFileSize;
this.dataCapacity = minimumFileSize-HEADER_SIZE;
}
}

/**
* Makes a new page that is at least as large as minimumPageCapacity, and can definitely hold minimumPageDataCapacity
* @param pagenum
* @param minimumPageDataCapacity
* @param minimumPageCapacity
* @param dirPath
* @return
*/
public static MmapPageIO makeSuitablySizedPage(int pagenum, int minimumPageDataCapacity, int minimumPageCapacity, Path dirPath) {
// Pick the larger of the two potential page sizes
if (minimumPageDataCapacity >= minimumPageCapacity) {
minimumPageCapacity = minimumPageDataCapacity+HEADER_SIZE;
}

return new MmapPageIO(pagenum, minimumPageCapacity, dirPath);
}


@Override
public void open(long minSeqNum, int elementCount) throws IOException {
mapFile();
Expand Down Expand Up @@ -227,10 +264,15 @@ public void write(final List<byte[]> bytesList, final long minSeqNum) {

newOffsets[i] = mainBufferOffset;

buffer.putLong(seqNum);
buffer.putInt(bytes.length);
buffer.put(bytes);
buffer.putInt(checksum(bytes));
try {
buffer.putLong(seqNum);
buffer.putInt(bytes.length);
buffer.put(bytes);
buffer.putInt(checksum(bytes));
} catch ( BufferOverflowException e) {
e.printStackTrace();
throw(e);
}

mainBufferOffset += persistedByteCount(bytes.length, 1);
}
Expand Down Expand Up @@ -308,7 +350,7 @@ public boolean hasSpace(int bytes) {

@Override
public int persistedByteCount(int byteCount, int numElements) {
int metadataBytes = (SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE) * numElements;
int metadataBytes = METADATA_SIZE * numElements;
return byteCount + metadataBytes;
}

Expand Down
39 changes: 31 additions & 8 deletions logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -96,18 +97,40 @@ public void singleWriteRead() throws IOException {
public void batchWriteRead() throws IOException {
// We need a fairly large size because reads don't span pages
try (Queue q = new Queue(TestSettings.persistedQueueSettings(1000, dataPath))) {
q.open();
assertThatIsReadableAllAtOnce(batch, q);
}
}

@Test
public void batchWritePageLargerThanConfiguredPageCapacity() throws IOException {
List<Queueable> tooLargeBatch = new ArrayList<>();
int batchSize = 0;
int pageCapacity = 1000;
int i = 0;
while (batchSize < pageCapacity) {
Queueable element = new StringElement("Hello There " + i);
tooLargeBatch.add(element);
batchSize += element.serialize().length;
i++;
}

long res = q.write(batch);
try (Queue q = new Queue(TestSettings.persistedQueueSettings(pageCapacity, dataPath))) {
assertThatIsReadableAllAtOnce(tooLargeBatch, q);
}
}

Batch b = q.nonBlockReadBatch(batch.size());
private void assertThatIsReadableAllAtOnce(List<Queueable> tooLargeBatch, Queue q) throws IOException {
q.open();

assertThat(b.getElements().size(), is(batch.size()));
for (int i = 0; i < batch.size(); i++) {
assertThat(b.getElements().get(i).toString(), is(batch.get(i).toString()));
}
assertThat(q.nonBlockReadBatch(1), nullValue());
long res = q.write(tooLargeBatch);

Batch b = q.nonBlockReadBatch(tooLargeBatch.size());

assertThat(b.getElements().size(), is(tooLargeBatch.size()));
for (int i = 0; i < batch.size(); i++) {
assertThat(b.getElements().get(i).toString(), is(tooLargeBatch.get(i).toString()));
}
assertThat(q.nonBlockReadBatch(1), nullValue());
}

/**
Expand Down