Skip to content

Commit

Permalink
Checkpoint bulk rename
Browse files Browse the repository at this point in the history
First attempt at variable page sizes that passes all tests
  • Loading branch information
andrewvc committed May 1, 2018
1 parent ffedae5 commit 04c0841
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 35 deletions.
40 changes: 22 additions & 18 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,29 @@ private void addTailPage(Page page) throws IOException {
page.getPageIO().deactivate();
}

/**
* create a new empty headpage of the default size
* @param pageNum
* @throws IOException
*/
private void newCheckpointedHeadpage(int pageNum) throws IOException {
newCheckpointedHeadpage(pageNum, 0);
}

/**
* create a new empty headpage for the given pageNum and immediately checkpoint it
*
* @param pageNum the page number of the new head page
* @throws IOException
*/
private void newCheckpointedHeadpage(int pageNum) throws IOException {
PageIO headPageIO = new MmapPageIO(pageNum, this.pageCapacity, this.dirPath);
private void newCheckpointedHeadpage(int pageNum, int minimumDataCapacity) throws IOException {
PageIO headPageIO;
if (minimumDataCapacity > pageCapacity - MmapPageIO.HEADER_SIZE) {
headPageIO = MmapPageIO.makeExactlySizedPage(pageNum, minimumDataCapacity, this.dirPath);
} else {
headPageIO = new MmapPageIO(pageNum, pageCapacity, this.dirPath);
}

headPageIO.create();
this.headPage = PageFactory.newHeadPage(pageNum, this, headPageIO);
this.headPage.forceCheckpoint();
Expand Down Expand Up @@ -386,22 +401,12 @@ public long write(List<Queueable> queueables, long timeoutMillis) throws IOExcep
// element at risk in the always-full queue state. In the later, when closing a full queue, it would be impossible
// to write the current element.

final int headPageCapacity = this.headPage.getPageIO().getCapacity();
// Our batch might span multiple pages, so we have to break it up if its too big

final int persistedBatchSize = this.headPage.getPageIO().persistedByteCount(dataSize, serializedQueueables.size());

if (persistedBatchSize <= headPageCapacity) {


try {
return unsafeQueueWrite(serializedQueueables, timeoutMillis, persistedBatchSize);
} finally {
lock.unlock();
}
} else {
// Batch is larger than a page, we can't do an atomic write
throw new IOException("data to be written is bigger than page capacity");
try {
return unsafeQueueWrite(serializedQueueables, timeoutMillis, persistedBatchSize);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -457,8 +462,7 @@ private long unsafeQueueWrite(List<byte[]> serializedQueueables, long timeoutMil
behead();
}

// create new head page
newCheckpointedHeadpage(newHeadPageNum);
newCheckpointedHeadpage(newHeadPageNum, totalPersistedSize);
}

long minWrittenSeqNum = this.seqNum + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.BufferOverflowException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.CRC32;
import org.apache.logging.log4j.LogManager;
Expand All @@ -27,6 +26,7 @@ public final class MmapPageIO implements PageIO {
public static final int MIN_CAPACITY = VERSION_SIZE + SEQNUM_SIZE + LENGTH_SIZE + 1 + CHECKSUM_SIZE; // header overhead plus elements overhead to hold a single 1 byte element
public static final int HEADER_SIZE = 1; // version byte
public static final boolean VERIFY_CHECKSUM = true;
public static final int METADATA_SIZE = SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE;

private static final Logger LOGGER = LogManager.getLogger(MmapPageIO.class);

Expand All @@ -41,6 +41,7 @@ public final class MmapPageIO implements PageIO {
private final CRC32 checkSummer;

private final IntVector offsetMap;
private final int dataCapacity;

private int capacity; // page capacity is an int per the ByteBuffer class.
private long minSeqNum; // TODO: to make minSeqNum final we have to pass in the minSeqNum in the constructor and not set it on first write
Expand All @@ -50,17 +51,49 @@ public final class MmapPageIO implements PageIO {

private MappedByteBuffer buffer;

public MmapPageIO(int pageNum, int capacity, Path dirPath) {
/**
* Constructor that sets the page num, the minimum size of the file, and its path
* If the file on disk already exists, and is larger than the minimum capacity, that size
* will be used instead.
* @param pageNum
* @param minimumFileSize
* @param dirPath
*/
public MmapPageIO(int pageNum, int minimumFileSize, Path dirPath) {
this.minSeqNum = 0;
this.elementCount = 0;
this.version = 0;
this.head = 0;
this.capacity = capacity;
this.offsetMap = new IntVector();
this.checkSummer = new CRC32();
this.file = dirPath.resolve("page." + pageNum).toFile();
int existingFileSize = (int) file.length();
int existingFileCapacity = existingFileSize-1;
// Account for the version byte at the head of the file
// If this file has been created to a larger capacity than our default, use that
if (existingFileSize > minimumFileSize) {
this.capacity = existingFileSize;
this.dataCapacity = existingFileCapacity;
} else {
this.capacity = minimumFileSize;
this.dataCapacity = minimumFileSize-HEADER_SIZE;
}
}

/**
* The constructor takes capacity, but uses it as a file size
* This method takes a capacity argument, but uses it to create a queue that can hold that much data, when factoring in
* the size of the file header
* @param pagenum
* @param pageDataCapacity
* @param dirPath
* @return
*/
public static MmapPageIO makeExactlySizedPage(int pagenum, int pageDataCapacity, Path dirPath) {
return new MmapPageIO(pagenum, pageDataCapacity+HEADER_SIZE, dirPath);
}


@Override
public void open(long minSeqNum, int elementCount) throws IOException {
mapFile();
Expand Down Expand Up @@ -227,10 +260,15 @@ public void write(final List<byte[]> bytesList, final long minSeqNum) {

newOffsets[i] = mainBufferOffset;

buffer.putLong(seqNum);
buffer.putInt(bytes.length);
buffer.put(bytes);
buffer.putInt(checksum(bytes));
try {
buffer.putLong(seqNum);
buffer.putInt(bytes.length);
buffer.put(bytes);
buffer.putInt(checksum(bytes));
} catch ( BufferOverflowException e) {
e.printStackTrace();
throw(e);
}

mainBufferOffset += persistedByteCount(bytes.length, 1);
}
Expand Down Expand Up @@ -308,7 +346,7 @@ public boolean hasSpace(int bytes) {

@Override
public int persistedByteCount(int byteCount, int numElements) {
int metadataBytes = (SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE) * numElements;
int metadataBytes = METADATA_SIZE * numElements;
return byteCount + metadataBytes;
}

Expand Down
39 changes: 31 additions & 8 deletions logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -96,18 +97,40 @@ public void singleWriteRead() throws IOException {
public void batchWriteRead() throws IOException {
// We need a fairly large size because reads don't span pages
try (Queue q = new Queue(TestSettings.persistedQueueSettings(1000, dataPath))) {
q.open();
assertThatIsReadableAllAtOnce(batch, q);
}
}

@Test
public void batchWritePageLargerThanConfiguredPageCapacity() throws IOException {
List<Queueable> tooLargeBatch = new ArrayList<>();
int batchSize = 0;
int pageCapacity = 1000;
int i = 0;
while (batchSize < pageCapacity) {
Queueable element = new StringElement("Hello There " + i);
tooLargeBatch.add(element);
batchSize += element.serialize().length;
i++;
}

long res = q.write(batch);
try (Queue q = new Queue(TestSettings.persistedQueueSettings(pageCapacity, dataPath))) {
assertThatIsReadableAllAtOnce(tooLargeBatch, q);
}
}

Batch b = q.nonBlockReadBatch(batch.size());
private void assertThatIsReadableAllAtOnce(List<Queueable> tooLargeBatch, Queue q) throws IOException {
q.open();

assertThat(b.getElements().size(), is(batch.size()));
for (int i = 0; i < batch.size(); i++) {
assertThat(b.getElements().get(i).toString(), is(batch.get(i).toString()));
}
assertThat(q.nonBlockReadBatch(1), nullValue());
long res = q.write(tooLargeBatch);

Batch b = q.nonBlockReadBatch(tooLargeBatch.size());

assertThat(b.getElements().size(), is(tooLargeBatch.size()));
for (int i = 0; i < batch.size(); i++) {
assertThat(b.getElements().get(i).toString(), is(tooLargeBatch.get(i).toString()));
}
assertThat(q.nonBlockReadBatch(1), nullValue());
}

/**
Expand Down

0 comments on commit 04c0841

Please sign in to comment.