Skip to content

Commit

Permalink
[client] RecordAccumulator's ready method will be blocked if no batch…
Browse files Browse the repository at this point in the history
… is ready.
  • Loading branch information
loserwang1024 committed Jan 13, 2025
1 parent e730507 commit 8b54e99
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -58,9 +59,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -116,6 +121,11 @@ public final class RecordAccumulator {

private final IdempotenceManager idempotenceManager;

private final ReentrantLock batchReadyLock = new ReentrantLock();

@GuardedBy("lock")
private final Condition batchIsReady = batchReadyLock.newCondition();

// TODO add retryBackoffMs to retry the produce request upon receiving an error.
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
// TODO add nextBatchExpiryTimeMs
Expand Down Expand Up @@ -257,6 +267,10 @@ public RecordAppendResult append(
* </pre>
*/
public ReadyCheckResult ready(Cluster cluster) {
return ready(cluster, batchTimeoutMs);
}

public ReadyCheckResult ready(Cluster cluster, long timeoutMs) {
Set<ServerNode> readyNodes = new HashSet<>();
Set<PhysicalTablePath> unknownLeaderTables = new HashSet<>();
// Go table by table so that we can get queue sizes for buckets in a table and calculate
Expand All @@ -269,6 +283,9 @@ public ReadyCheckResult ready(Cluster cluster) {
readyNodes,
unknownLeaderTables,
cluster));
if (readyNodes.isEmpty()) {
waitBatchReady(timeoutMs);
}

// TODO and the earliest time at which any non-send-able bucket will be ready;

Expand Down Expand Up @@ -436,59 +453,59 @@ private void bucketReady(
Map<Integer, Deque<WriteBatch>> batches = bucketAndWriteBatches.batches;
// Collect the queue sizes for available buckets to be used in adaptive bucket allocate.

boolean exhausted = writerMemoryBuffer.queued() > 0 || memorySegmentPool.queued() > 0;
batches.forEach(
(bucketId, deque) -> {
TableBucket tableBucket = cluster.getTableBucket(physicalTablePath, bucketId);
ServerNode leader = cluster.leaderFor(tableBucket);
final long waitedTimeMs;
final int dequeSize;
final boolean full;

// Note: this loop is especially hot with large bucket counts.
// We are careful to only perform the minimum required inside the synchronized
// block, as this lock is also used to synchronize writer threads
// attempting to append() to a bucket/batch.
synchronized (deque) {
// Deque are often empty in this path, esp with large bucket counts,
// so we exit early if we can.
WriteBatch batch = deque.peekFirst();
if (batch == null) {
return;
}

waitedTimeMs = batch.waitedTimeMs(System.currentTimeMillis());
dequeSize = deque.size();
full = dequeSize > 1 || batch.isClosed();
}

if (leader == null) {
// This is a bucket for which leader is not known, but messages are
// available to send. Note that entries are currently not removed from
// batches when deque is empty.
unknownLeaderTables.add(physicalTablePath);
} else {
batchReady(exhausted, leader, waitedTimeMs, full, readyNodes);
batchReady(deque, leader, readyNodes);
}
});
}

private void batchReady(
boolean exhausted,
ServerNode leader,
long waitedTimeMs,
boolean full,
Set<ServerNode> readyNodes) {
Deque<WriteBatch> deque, ServerNode leader, Set<ServerNode> readyNodes) {
if (!readyNodes.contains(leader)) {
// if the wait time larger than lingerMs, we can send this batch even if it is not full.
boolean expired = waitedTimeMs >= (long) batchTimeoutMs;
boolean sendAble = full || expired || exhausted || closed || flushInProgress();
if (sendAble) {
if (batchReady(deque)) {
readyNodes.add(leader);
}
}
}

private boolean batchReady(Deque<WriteBatch> deque) {
final long waitedTimeMs;
final int dequeSize;
final boolean full;

boolean exhausted = writerMemoryBuffer.queued() > 0 || memorySegmentPool.queued() > 0;
// Note: this loop is especially hot with large bucket counts.
// We are careful to only perform the minimum required inside the synchronized
// block, as this lock is also used to synchronize writer threads
// attempting to append() to a bucket/batch.
synchronized (deque) {
// Deque are often empty in this path, esp with large bucket counts,
// so we exit early if we can.
WriteBatch batch = deque.peekFirst();
if (batch == null) {
return false;
}

waitedTimeMs = batch.waitedTimeMs(System.currentTimeMillis());
dequeSize = deque.size();
full = dequeSize > 1 || batch.isClosed();
}
// if the wait time larger than lingerMs, we can send this batch even if it is not full.
boolean expired = waitedTimeMs >= (long) batchTimeoutMs;
return full || expired || exhausted || closed || flushInProgress();
}

/**
* Are there any threads currently waiting on a flush?
*
Expand Down Expand Up @@ -573,6 +590,7 @@ private RecordAppendResult tryAppend(
WriteBatch last = deque.peekLast();
if (last != null) {
boolean success = last.tryAppend(writeRecord, callback);
notifyBatchReady(deque);
if (!success) {
last.close();
batchesToBuild.add(last);
Expand Down Expand Up @@ -812,6 +830,30 @@ private void insertInSequenceOrder(Deque<WriteBatch> deque, WriteBatch batch) {
}
}

private void notifyBatchReady(Deque<WriteBatch> deque) {
inLock(
batchReadyLock,
() -> {
if (batchReady(deque)) {
batchIsReady.signal();
}
});
}

private void waitBatchReady(long timeoutMs) {
try {
inLock(
batchReadyLock,
() -> {
if (timeoutMs > 0) {
batchIsReady.await(timeoutMs, TimeUnit.MILLISECONDS);
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/** Metadata about a record just appended to the record accumulator. */
public static final class RecordAppendResult {
public final boolean batchIsFull;
Expand Down Expand Up @@ -842,7 +884,7 @@ public ReadyCheckResult(
/** Close this accumulator and force all the record buffers to be drained. */
public void close() {
closed = true;

inLock(batchReadyLock, batchIsReady::signalAll);
writerMemoryBuffer.close();
memorySegmentPool.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,6 @@ private void sendWriteData() throws Exception {
}

Set<ServerNode> readyNodes = readyCheckResult.readyNodes;
if (readyNodes.isEmpty()) {
// TODO The method sendWriteData is in a busy loop. If there is no data continuously, it
// will cause the CPU to be occupied. Currently, we just sleep 1 second to avoid this.
// In the future, we need to introduce delay logic to deal with it.
Thread.sleep(1);
}

// get the list of batches prepare to send.
Map<Integer, List<WriteBatch>> batches =
accumulator.drain(metadataUpdater.getCluster(), readyNodes, maxRequestSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
Expand Down Expand Up @@ -164,7 +165,7 @@ void testFull() throws Exception {
WriteBatch batch = writeBatches.peekFirst();
assertThat(batch.isClosed()).isFalse();
// No buckets should be ready.
assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0);
assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0);
}

// this appends doesn't fit in the first batch, so a new batch is created and the first
Expand All @@ -175,7 +176,7 @@ void testFull() throws Exception {
Iterator<WriteBatch> bucketBatchesIterator = writeBatches.iterator();
assertThat(bucketBatchesIterator.next().isClosed()).isTrue();
// Bucket's leader should be ready.
assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1));
assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1));

List<WriteBatch> batches =
accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE)
Expand Down Expand Up @@ -216,7 +217,7 @@ void testAppendLarge() throws Exception {
// row size > 10;
accum.append(createRecord(row1), writeCallback, cluster, 0, false);
// bucket's leader should be ready for bucket0.
assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1));
assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1));

Deque<WriteBatch> writeBatches = accum.getDeque(DATA1_PHYSICAL_TABLE_PATH, tb1);
assertThat(writeBatches).hasSize(1);
Expand Down Expand Up @@ -269,7 +270,7 @@ void testAppendWithStickyBucketAssigner() throws Exception {
// We only appended if we do not retry.
if (!switchBucket) {
appends++;
assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0);
assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0);
}
}

Expand Down Expand Up @@ -317,7 +318,7 @@ void testPartialDrain() throws Exception {
}
}

assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1));
assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1));
List<WriteBatch> batches =
accum.drain(cluster, Collections.singleton(node1), 1024).get(node1.id());
// Due to size bound only one bucket should have been retrieved.
Expand All @@ -334,12 +335,12 @@ void testFlush() throws Exception {
assertThat(accum.hasIncomplete()).isTrue();
}

assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0);
assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0);

accum.beginFlush();
// drain and deallocate all batches.
Map<Integer, List<WriteBatch>> results =
accum.drain(cluster, accum.ready(cluster).readyNodes, Integer.MAX_VALUE);
accum.drain(cluster, accum.ready(cluster, 10).readyNodes, Integer.MAX_VALUE);
assertThat(accum.hasIncomplete()).isTrue();

for (List<WriteBatch> batches : results.values()) {
Expand Down Expand Up @@ -367,7 +368,7 @@ void testTableWithUnknownLeader() throws Exception {
cluster = updateCluster(Collections.singletonList(bucket1));

accum.append(createRecord(row), writeCallback, cluster, 0, false);
RecordAccumulator.ReadyCheckResult readyCheckResult = accum.ready(cluster);
RecordAccumulator.ReadyCheckResult readyCheckResult = accum.ready(cluster, 10);
assertThat(readyCheckResult.unknownLeaderTables)
.isEqualTo(Collections.singleton(DATA1_PHYSICAL_TABLE_PATH));
assertThat(readyCheckResult.readyNodes.size()).isEqualTo(0);
Expand All @@ -378,7 +379,7 @@ void testTableWithUnknownLeader() throws Exception {
// update the bucket info with leader.
cluster = updateCluster(Collections.singletonList(bucket1));

readyCheckResult = accum.ready(cluster);
readyCheckResult = accum.ready(cluster, 10);
assertThat(readyCheckResult.unknownLeaderTables).isEmpty();
assertThat(readyCheckResult.readyNodes.size()).isEqualTo(1);
}
Expand All @@ -395,6 +396,41 @@ void testAwaitFlushComplete() throws Exception {
assertThatThrownBy(accum::awaitFlushCompletion).isInstanceOf(InterruptedException.class);
}

@Test
void testReadyWithTimeOut() throws Exception {
// test case: node1(tb1, tb2), node2(tb3).
IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {1, "a"});
long batchSize = getTestBatchSize(row);
RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, batchSize * 2);

// add bucket into cluster.
cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3, bucket4));

// wait for ready timeout.
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
accum.ready(cluster, 500L);
});
assertThat(future).isNotCompleted();
Thread.sleep(1000L);
assertThat(future).isCompleted();

// wait for ready success.
future =
CompletableFuture.runAsync(
() -> {
accum.ready(cluster, 60 * 60 * 1000L);
});
Thread.sleep(1000L);
assertThat(future).isNotCompleted();
// initial data.
accum.append(createRecord(row), writeCallback, cluster, 0, false);
accum.append(createRecord(row), writeCallback, cluster, 0, false);
Thread.sleep(100L);
assertThat(future).isCompleted();
}

private WriteRecord createRecord(InternalRow row) {
return new WriteRecord(DATA1_PHYSICAL_TABLE_PATH, WriteKind.APPEND, row, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
import com.alibaba.fluss.rpc.protocol.Errors;
import com.alibaba.fluss.server.tablet.TestTabletServerGateway;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
Expand All @@ -60,6 +63,7 @@ final class SenderTest {
private static final int REQUEST_TIMEOUT = 5000;
private static final short ACKS_ALL = -1;
private static final int MAX_INFLIGHT_REQUEST_PER_BUCKET = 5;
private static final Logger log = LoggerFactory.getLogger(SenderTest.class);

private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 0);
private TestingMetadataUpdater metadataUpdater;
Expand All @@ -76,6 +80,11 @@ public void setup() {
sender = setupWithIdempotenceState();
}

@AfterEach
public void teardown() throws Exception {
sender.forceClose();
}

@Test
void testSimple() throws Exception {
long offset = 0;
Expand Down Expand Up @@ -239,15 +248,17 @@ void testIdempotenceWithMaxInflightBatch() throws Exception {
sender1.runOnce();
assertThat(idempotenceManager.inflightBatchSize(tb1))
.isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET);
assertThat(accumulator.ready(metadataUpdater.getCluster()).readyNodes.size()).isEqualTo(1);
assertThat(accumulator.ready(metadataUpdater.getCluster(), 10).readyNodes.size())
.isEqualTo(1);

// finish the first batch, the latest batch will be drained from the accumulator.
finishIdempotentProduceLogRequest(0, tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
sender1.runOnce(); // receive response 0.
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(0));
assertThat(idempotenceManager.inflightBatchSize(tb1))
.isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET);
assertThat(accumulator.ready(metadataUpdater.getCluster()).readyNodes.size()).isEqualTo(0);
assertThat(accumulator.ready(metadataUpdater.getCluster(), 10).readyNodes.size())
.isEqualTo(0);
}

@Test
Expand Down

0 comments on commit 8b54e99

Please sign in to comment.