diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java index 2aed98c12..e53bdae78 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java @@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.util.ArrayDeque; import java.util.ArrayList; @@ -58,9 +59,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -116,6 +121,11 @@ public final class RecordAccumulator { private final IdempotenceManager idempotenceManager; + private final ReentrantLock batchReadyLock = new ReentrantLock(); + + @GuardedBy("lock") + private final Condition batchIsReady = batchReadyLock.newCondition(); + // TODO add retryBackoffMs to retry the produce request upon receiving an error. // TODO add deliveryTimeoutMs to report success or failure on record delivery. // TODO add nextBatchExpiryTimeMs @@ -257,6 +267,10 @@ public RecordAppendResult append( * */ public ReadyCheckResult ready(Cluster cluster) { + return ready(cluster, batchTimeoutMs); + } + + public ReadyCheckResult ready(Cluster cluster, long timeoutMs) { Set readyNodes = new HashSet<>(); Set unknownLeaderTables = new HashSet<>(); // Go table by table so that we can get queue sizes for buckets in a table and calculate @@ -269,6 +283,9 @@ public ReadyCheckResult ready(Cluster cluster) { readyNodes, unknownLeaderTables, cluster)); + if (readyNodes.isEmpty()) { + waitBatchReady(timeoutMs); + } // TODO and the earliest time at which any non-send-able bucket will be ready; @@ -436,31 +453,10 @@ private void bucketReady( Map> batches = bucketAndWriteBatches.batches; // Collect the queue sizes for available buckets to be used in adaptive bucket allocate. - boolean exhausted = writerMemoryBuffer.queued() > 0 || memorySegmentPool.queued() > 0; batches.forEach( (bucketId, deque) -> { TableBucket tableBucket = cluster.getTableBucket(physicalTablePath, bucketId); ServerNode leader = cluster.leaderFor(tableBucket); - final long waitedTimeMs; - final int dequeSize; - final boolean full; - - // Note: this loop is especially hot with large bucket counts. - // We are careful to only perform the minimum required inside the synchronized - // block, as this lock is also used to synchronize writer threads - // attempting to append() to a bucket/batch. - synchronized (deque) { - // Deque are often empty in this path, esp with large bucket counts, - // so we exit early if we can. - WriteBatch batch = deque.peekFirst(); - if (batch == null) { - return; - } - - waitedTimeMs = batch.waitedTimeMs(System.currentTimeMillis()); - dequeSize = deque.size(); - full = dequeSize > 1 || batch.isClosed(); - } if (leader == null) { // This is a bucket for which leader is not known, but messages are @@ -468,27 +464,48 @@ private void bucketReady( // batches when deque is empty. unknownLeaderTables.add(physicalTablePath); } else { - batchReady(exhausted, leader, waitedTimeMs, full, readyNodes); + batchReady(deque, leader, readyNodes); } }); } private void batchReady( - boolean exhausted, - ServerNode leader, - long waitedTimeMs, - boolean full, - Set readyNodes) { + Deque deque, ServerNode leader, Set readyNodes) { if (!readyNodes.contains(leader)) { // if the wait time larger than lingerMs, we can send this batch even if it is not full. - boolean expired = waitedTimeMs >= (long) batchTimeoutMs; - boolean sendAble = full || expired || exhausted || closed || flushInProgress(); - if (sendAble) { + if (batchReady(deque)) { readyNodes.add(leader); } } } + private boolean batchReady(Deque deque) { + final long waitedTimeMs; + final int dequeSize; + final boolean full; + + boolean exhausted = writerMemoryBuffer.queued() > 0 || memorySegmentPool.queued() > 0; + // Note: this loop is especially hot with large bucket counts. + // We are careful to only perform the minimum required inside the synchronized + // block, as this lock is also used to synchronize writer threads + // attempting to append() to a bucket/batch. + synchronized (deque) { + // Deque are often empty in this path, esp with large bucket counts, + // so we exit early if we can. + WriteBatch batch = deque.peekFirst(); + if (batch == null) { + return false; + } + + waitedTimeMs = batch.waitedTimeMs(System.currentTimeMillis()); + dequeSize = deque.size(); + full = dequeSize > 1 || batch.isClosed(); + } + // if the wait time larger than lingerMs, we can send this batch even if it is not full. + boolean expired = waitedTimeMs >= (long) batchTimeoutMs; + return full || expired || exhausted || closed || flushInProgress(); + } + /** * Are there any threads currently waiting on a flush? * @@ -573,6 +590,7 @@ private RecordAppendResult tryAppend( WriteBatch last = deque.peekLast(); if (last != null) { boolean success = last.tryAppend(writeRecord, callback); + notifyBatchReady(deque); if (!success) { last.close(); batchesToBuild.add(last); @@ -812,6 +830,30 @@ private void insertInSequenceOrder(Deque deque, WriteBatch batch) { } } + private void notifyBatchReady(Deque deque) { + inLock( + batchReadyLock, + () -> { + if (batchReady(deque)) { + batchIsReady.signal(); + } + }); + } + + private void waitBatchReady(long timeoutMs) { + try { + inLock( + batchReadyLock, + () -> { + if (timeoutMs > 0) { + batchIsReady.await(timeoutMs, TimeUnit.MILLISECONDS); + } + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + /** Metadata about a record just appended to the record accumulator. */ public static final class RecordAppendResult { public final boolean batchIsFull; @@ -842,7 +884,7 @@ public ReadyCheckResult( /** Close this accumulator and force all the record buffers to be drained. */ public void close() { closed = true; - + inLock(batchReadyLock, batchIsReady::signalAll); writerMemoryBuffer.close(); memorySegmentPool.close(); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java index c610a1890..5dd95996b 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java @@ -201,13 +201,6 @@ private void sendWriteData() throws Exception { } Set readyNodes = readyCheckResult.readyNodes; - if (readyNodes.isEmpty()) { - // TODO The method sendWriteData is in a busy loop. If there is no data continuously, it - // will cause the CPU to be occupied. Currently, we just sleep 1 second to avoid this. - // In the future, we need to introduce delay logic to deal with it. - Thread.sleep(1); - } - // get the list of batches prepare to send. Map> batches = accumulator.drain(metadataUpdater.getCluster(), readyNodes, maxRequestSize); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java index 7aff708e6..2fcabfafb 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/RecordAccumulatorTest.java @@ -58,6 +58,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; @@ -164,7 +165,7 @@ void testFull() throws Exception { WriteBatch batch = writeBatches.peekFirst(); assertThat(batch.isClosed()).isFalse(); // No buckets should be ready. - assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0); + assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0); } // this appends doesn't fit in the first batch, so a new batch is created and the first @@ -175,7 +176,7 @@ void testFull() throws Exception { Iterator bucketBatchesIterator = writeBatches.iterator(); assertThat(bucketBatchesIterator.next().isClosed()).isTrue(); // Bucket's leader should be ready. - assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1)); + assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1)); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE) @@ -216,7 +217,7 @@ void testAppendLarge() throws Exception { // row size > 10; accum.append(createRecord(row1), writeCallback, cluster, 0, false); // bucket's leader should be ready for bucket0. - assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1)); + assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1)); Deque writeBatches = accum.getDeque(DATA1_PHYSICAL_TABLE_PATH, tb1); assertThat(writeBatches).hasSize(1); @@ -269,7 +270,7 @@ void testAppendWithStickyBucketAssigner() throws Exception { // We only appended if we do not retry. if (!switchBucket) { appends++; - assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0); + assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0); } } @@ -317,7 +318,7 @@ void testPartialDrain() throws Exception { } } - assertThat(accum.ready(cluster).readyNodes).isEqualTo(Collections.singleton(node1)); + assertThat(accum.ready(cluster, 10).readyNodes).isEqualTo(Collections.singleton(node1)); List batches = accum.drain(cluster, Collections.singleton(node1), 1024).get(node1.id()); // Due to size bound only one bucket should have been retrieved. @@ -334,12 +335,12 @@ void testFlush() throws Exception { assertThat(accum.hasIncomplete()).isTrue(); } - assertThat(accum.ready(cluster).readyNodes.size()).isEqualTo(0); + assertThat(accum.ready(cluster, 10).readyNodes.size()).isEqualTo(0); accum.beginFlush(); // drain and deallocate all batches. Map> results = - accum.drain(cluster, accum.ready(cluster).readyNodes, Integer.MAX_VALUE); + accum.drain(cluster, accum.ready(cluster, 10).readyNodes, Integer.MAX_VALUE); assertThat(accum.hasIncomplete()).isTrue(); for (List batches : results.values()) { @@ -367,7 +368,7 @@ void testTableWithUnknownLeader() throws Exception { cluster = updateCluster(Collections.singletonList(bucket1)); accum.append(createRecord(row), writeCallback, cluster, 0, false); - RecordAccumulator.ReadyCheckResult readyCheckResult = accum.ready(cluster); + RecordAccumulator.ReadyCheckResult readyCheckResult = accum.ready(cluster, 10); assertThat(readyCheckResult.unknownLeaderTables) .isEqualTo(Collections.singleton(DATA1_PHYSICAL_TABLE_PATH)); assertThat(readyCheckResult.readyNodes.size()).isEqualTo(0); @@ -378,7 +379,7 @@ void testTableWithUnknownLeader() throws Exception { // update the bucket info with leader. cluster = updateCluster(Collections.singletonList(bucket1)); - readyCheckResult = accum.ready(cluster); + readyCheckResult = accum.ready(cluster, 10); assertThat(readyCheckResult.unknownLeaderTables).isEmpty(); assertThat(readyCheckResult.readyNodes.size()).isEqualTo(1); } @@ -395,6 +396,41 @@ void testAwaitFlushComplete() throws Exception { assertThatThrownBy(accum::awaitFlushCompletion).isInstanceOf(InterruptedException.class); } + @Test + void testReadyWithTimeOut() throws Exception { + // test case: node1(tb1, tb2), node2(tb3). + IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {1, "a"}); + long batchSize = getTestBatchSize(row); + RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, batchSize * 2); + + // add bucket into cluster. + cluster = updateCluster(Arrays.asList(bucket1, bucket2, bucket3, bucket4)); + + // wait for ready timeout. + CompletableFuture future = + CompletableFuture.runAsync( + () -> { + accum.ready(cluster, 500L); + }); + assertThat(future).isNotCompleted(); + Thread.sleep(1000L); + assertThat(future).isCompleted(); + + // wait for ready success. + future = + CompletableFuture.runAsync( + () -> { + accum.ready(cluster, 60 * 60 * 1000L); + }); + Thread.sleep(1000L); + assertThat(future).isNotCompleted(); + // initial data. + accum.append(createRecord(row), writeCallback, cluster, 0, false); + accum.append(createRecord(row), writeCallback, cluster, 0, false); + Thread.sleep(100L); + assertThat(future).isCompleted(); + } + private WriteRecord createRecord(InternalRow row) { return new WriteRecord(DATA1_PHYSICAL_TABLE_PATH, WriteKind.APPEND, row, null); } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java index 6c4d899c7..dd0f7ffbd 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/SenderTest.java @@ -33,8 +33,11 @@ import com.alibaba.fluss.rpc.protocol.Errors; import com.alibaba.fluss.server.tablet.TestTabletServerGateway; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; @@ -60,6 +63,7 @@ final class SenderTest { private static final int REQUEST_TIMEOUT = 5000; private static final short ACKS_ALL = -1; private static final int MAX_INFLIGHT_REQUEST_PER_BUCKET = 5; + private static final Logger log = LoggerFactory.getLogger(SenderTest.class); private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 0); private TestingMetadataUpdater metadataUpdater; @@ -76,6 +80,11 @@ public void setup() { sender = setupWithIdempotenceState(); } + @AfterEach + public void teardown() throws Exception { + sender.forceClose(); + } + @Test void testSimple() throws Exception { long offset = 0; @@ -239,7 +248,8 @@ void testIdempotenceWithMaxInflightBatch() throws Exception { sender1.runOnce(); assertThat(idempotenceManager.inflightBatchSize(tb1)) .isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET); - assertThat(accumulator.ready(metadataUpdater.getCluster()).readyNodes.size()).isEqualTo(1); + assertThat(accumulator.ready(metadataUpdater.getCluster(), 10).readyNodes.size()) + .isEqualTo(1); // finish the first batch, the latest batch will be drained from the accumulator. finishIdempotentProduceLogRequest(0, tb1, 0, createProduceLogResponse(tb1, 0L, 1L)); @@ -247,7 +257,8 @@ void testIdempotenceWithMaxInflightBatch() throws Exception { assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(0)); assertThat(idempotenceManager.inflightBatchSize(tb1)) .isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET); - assertThat(accumulator.ready(metadataUpdater.getCluster()).readyNodes.size()).isEqualTo(0); + assertThat(accumulator.ready(metadataUpdater.getCluster(), 10).readyNodes.size()) + .isEqualTo(0); } @Test