Skip to content

Commit

Permalink
[client] RecordAccumulator's ready method will sleep for nextReadyChe…
Browse files Browse the repository at this point in the history
…ckDelayMs if no batch is ready.
  • Loading branch information
loserwang1024 committed Jan 22, 2025
1 parent 2658ea0 commit 8083e64
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public ArrowLogWriteBatch(
PhysicalTablePath physicalTablePath,
int schemaId,
ArrowWriter arrowWriter,
AbstractPagedOutputView outputView) {
super(tableBucket, physicalTablePath);
AbstractPagedOutputView outputView,
long createdMs) {
super(tableBucket, physicalTablePath, createdMs);
this.outputView = outputView;
this.recordsBuilder =
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public IndexedLogWriteBatch(
PhysicalTablePath physicalTablePath,
int schemaId,
int writeLimit,
AbstractPagedOutputView outputView) {
super(tableBucket, physicalTablePath);
AbstractPagedOutputView outputView,
long createdMs) {
super(tableBucket, physicalTablePath, createdMs);
this.outputView = outputView;
this.recordsBuilder =
MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, outputView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ public KvWriteBatch(
KvFormat kvFormat,
int writeLimit,
AbstractPagedOutputView outputView,
@Nullable int[] targetColumns) {
super(tableBucket, physicalTablePath);
@Nullable int[] targetColumns,
long createdMs) {
super(tableBucket, physicalTablePath, createdMs);
this.outputView = outputView;
this.recordsBuilder =
KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.alibaba.fluss.utils.CopyOnWriteMap;
import com.alibaba.fluss.utils.MathUtils;
import com.alibaba.fluss.utils.Preconditions;
import com.alibaba.fluss.utils.clock.Clock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,6 +107,7 @@ public final class RecordAccumulator {
private final Map<Integer, Integer> nodesDrainIndex;

private final IdempotenceManager idempotenceManager;
private final Clock clock;

// TODO add retryBackoffMs to retry the produce request upon receiving an error.
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
Expand All @@ -114,7 +116,8 @@ public final class RecordAccumulator {
RecordAccumulator(
Configuration conf,
IdempotenceManager idempotenceManager,
WriterMetricGroup writerMetricGroup) {
WriterMetricGroup writerMetricGroup,
Clock clock) {
this.closed = false;
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
Expand All @@ -133,6 +136,7 @@ public final class RecordAccumulator {
this.incomplete = new IncompleteBatches();
this.nodesDrainIndex = new HashMap<>();
this.idempotenceManager = idempotenceManager;
this.clock = clock;
registerMetrics(writerMetricGroup);
}

Expand Down Expand Up @@ -233,21 +237,26 @@ public RecordAppendResult append(
*/
public ReadyCheckResult ready(Cluster cluster) {
Set<ServerNode> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = batchTimeoutMs;
Set<PhysicalTablePath> unknownLeaderTables = new HashSet<>();
// Go table by table so that we can get queue sizes for buckets in a table and calculate
// cumulative frequency table (used in bucket assigner).
writeBatches.forEach(
(tablePath, bucketAndWriteBatches) ->
bucketReady(
tablePath,
bucketAndWriteBatches,
readyNodes,
unknownLeaderTables,
cluster));

for (Map.Entry<PhysicalTablePath, BucketAndWriteBatches> writeBatchesEntry :
writeBatches.entrySet()) {
nextReadyCheckDelayMs =
bucketReady(
writeBatchesEntry.getKey(),
writeBatchesEntry.getValue(),
readyNodes,
unknownLeaderTables,
cluster,
nextReadyCheckDelayMs);
}

// TODO and the earliest time at which any non-send-able bucket will be ready;

return new ReadyCheckResult(readyNodes, unknownLeaderTables);
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTables);
}

/**
Expand Down Expand Up @@ -393,66 +402,88 @@ private List<MemorySegment> allocateMemorySegments(
}

/** Check whether there are bucket ready for input table. */
private void bucketReady(
private long bucketReady(
PhysicalTablePath physicalTablePath,
BucketAndWriteBatches bucketAndWriteBatches,
Set<ServerNode> readyNodes,
Set<PhysicalTablePath> unknownLeaderTables,
Cluster cluster) {
Cluster cluster,
long nextReadyCheckDelayMs) {
Map<Integer, Deque<WriteBatch>> batches = bucketAndWriteBatches.batches;
// Collect the queue sizes for available buckets to be used in adaptive bucket allocate.

boolean exhausted = writerBufferPool.queued() > 0;
batches.forEach(
(bucketId, deque) -> {
TableBucket tableBucket = cluster.getTableBucket(physicalTablePath, bucketId);
ServerNode leader = cluster.leaderFor(tableBucket);
final long waitedTimeMs;
final int dequeSize;
final boolean full;

// Note: this loop is especially hot with large bucket counts.
// We are careful to only perform the minimum required inside the synchronized
// block, as this lock is also used to synchronize writer threads
// attempting to append() to a bucket/batch.
synchronized (deque) {
// Deque are often empty in this path, esp with large bucket counts,
// so we exit early if we can.
WriteBatch batch = deque.peekFirst();
if (batch == null) {
return;
}

waitedTimeMs = batch.waitedTimeMs(System.currentTimeMillis());
dequeSize = deque.size();
full = dequeSize > 1 || batch.isClosed();
}
for (Map.Entry<Integer, Deque<WriteBatch>> entry : batches.entrySet()) {
int bucketId = entry.getKey();
Deque<WriteBatch> deque = entry.getValue();

TableBucket tableBucket = cluster.getTableBucket(physicalTablePath, bucketId);
ServerNode leader = cluster.leaderFor(tableBucket);
final long waitedTimeMs;
final int dequeSize;
final boolean full;

// Note: this loop is especially hot with large bucket counts.
// We are careful to only perform the minimum required inside the synchronized
// block, as this lock is also used to synchronize writer threads
// attempting to append() to a bucket/batch.
synchronized (deque) {
// Deque are often empty in this path, esp with large bucket counts,
// so we exit early if we can.
WriteBatch batch = deque.peekFirst();
if (batch == null) {
continue;
}

if (leader == null) {
// This is a bucket for which leader is not known, but messages are
// available to send. Note that entries are currently not removed from
// batches when deque is empty.
unknownLeaderTables.add(physicalTablePath);
} else {
batchReady(exhausted, leader, waitedTimeMs, full, readyNodes);
}
});
waitedTimeMs = batch.waitedTimeMs(clock.milliseconds());
dequeSize = deque.size();
full = dequeSize > 1 || batch.isClosed();
}

if (leader == null) {
// This is a bucket for which leader is not known, but messages are
// available to send. Note that entries are currently not removed from
// batches when deque is empty.
unknownLeaderTables.add(physicalTablePath);
} else {
nextReadyCheckDelayMs =
batchReady(
exhausted,
leader,
waitedTimeMs,
full,
readyNodes,
nextReadyCheckDelayMs);
}
}

return nextReadyCheckDelayMs;
}

private void batchReady(
private long batchReady(
boolean exhausted,
ServerNode leader,
long waitedTimeMs,
boolean full,
Set<ServerNode> readyNodes) {
Set<ServerNode> readyNodes,
long nextReadyCheckDelayMs) {
if (!readyNodes.contains(leader)) {
// if the wait time larger than lingerMs, we can send this batch even if it is not full.
boolean expired = waitedTimeMs >= (long) batchTimeoutMs;
boolean sendAble = full || expired || exhausted || closed || flushInProgress();
if (sendAble) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(batchTimeoutMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable bucket may
// have
// a leader that will later be found to have sendable data. However, this is good
// enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(nextReadyCheckDelayMs, timeLeftMs);
}
}
return nextReadyCheckDelayMs;
}

/**
Expand Down Expand Up @@ -496,7 +527,8 @@ private RecordAppendResult appendNewBatch(
tableInfo.getTableDescriptor().getKvFormat(),
outputView.getPreAllocatedSize(),
outputView,
writeRecord.getTargetColumns());
writeRecord.getTargetColumns(),
clock.milliseconds());
} else if (writeBatchType == WriteBatch.WriteBatchType.ARROW_LOG) {
ArrowWriter arrowWriter =
arrowWriterPool.getOrCreateWriter(
Expand All @@ -507,15 +539,21 @@ private RecordAppendResult appendNewBatch(
tableInfo.getTableDescriptor().getArrowCompressionInfo());
batch =
new ArrowLogWriteBatch(
tb, physicalTablePath, schemaId, arrowWriter, outputView);
tb,
physicalTablePath,
schemaId,
arrowWriter,
outputView,
clock.milliseconds());
} else {
batch =
new IndexedLogWriteBatch(
tb,
physicalTablePath,
schemaId,
outputView.getPreAllocatedSize(),
outputView);
outputView,
clock.milliseconds());
}

batch.tryAppend(writeRecord, callback);
Expand Down Expand Up @@ -779,11 +817,15 @@ public RecordAppendResult(
/** The set of nodes that have at leader one complete record batch in the accumulator. */
public static final class ReadyCheckResult {
public final Set<ServerNode> readyNodes;
public final long nextReadyCheckDelayMs;
public final Set<PhysicalTablePath> unknownLeaderTables;

public ReadyCheckResult(
Set<ServerNode> readyNodes, Set<PhysicalTablePath> unknownLeaderTables) {
Set<ServerNode> readyNodes,
long nextReadyCheckDelayMs,
Set<PhysicalTablePath> unknownLeaderTables) {
this.readyNodes = readyNodes;
this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
this.unknownLeaderTables = unknownLeaderTables;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ private void sendWriteData() throws Exception {
Set<ServerNode> readyNodes = readyCheckResult.readyNodes;
if (readyNodes.isEmpty()) {
// TODO The method sendWriteData is in a busy loop. If there is no data continuously, it
// will cause the CPU to be occupied. Currently, we just sleep 1 second to avoid this.
// will cause the CPU to be occupied.
// In the future, we need to introduce delay logic to deal with it.
Thread.sleep(1);
Thread.sleep(readyCheckResult.nextReadyCheckDelayMs);
}

// get the list of batches prepare to send.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ public abstract class WriteBatch {
protected int recordCount;
private long drainedMs;

public WriteBatch(TableBucket tableBucket, PhysicalTablePath physicalTablePath) {
public WriteBatch(
TableBucket tableBucket, PhysicalTablePath physicalTablePath, long createdMs) {
this.physicalTablePath = physicalTablePath;
this.createdMs = System.currentTimeMillis();
this.createdMs = createdMs;
this.tableBucket = tableBucket;
this.requestFuture = new RequestFuture();
this.recordCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.utils.CopyOnWriteMap;
import com.alibaba.fluss.utils.clock.SystemClock;
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;

import org.slf4j.Logger;
Expand Down Expand Up @@ -101,7 +102,9 @@ public WriterClient(

short acks = configureAcks(idempotenceManager.idempotenceEnabled());
int retries = configureRetries(idempotenceManager.idempotenceEnabled());
this.accumulator = new RecordAccumulator(conf, idempotenceManager, writerMetricGroup);
this.accumulator =
new RecordAccumulator(
conf, idempotenceManager, writerMetricGroup, SystemClock.getInstance());
this.sender = newSender(acks, retries);
this.ioThreadPool = createThreadPool();
ioThreadPool.submit(sender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ void testAppendWithPreAllocatedMemorySegments() throws Exception {
DATA1_TABLE_INFO.getSchemaId(),
maxSizeInBytes,
DATA1_ROW_TYPE,
ArrowCompressionInfo.NO_COMPRESSION),
ArrowCompressionInfo.NO_COMPRESSION,
System.currentTimeMillis()),
new PreAllocatedPagedOutputView(memorySegmentList));
assertThat(arrowLogWriteBatch.pooledMemorySegments()).isEqualTo(memorySegmentList);

Expand Down Expand Up @@ -181,7 +182,8 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI
maxSizeInBytes,
DATA1_ROW_TYPE,
ArrowCompressionInfo.NO_COMPRESSION),
new UnmanagedPagedOutputView(128));
new UnmanagedPagedOutputView(128),
System.currentTimeMillis());
}

private WriteCallback newWriteCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ private IndexedLogWriteBatch createLogWriteBatch(
DATA1_PHYSICAL_TABLE_PATH,
DATA1_TABLE_INFO.getSchemaId(),
writeLimit,
new PreAllocatedPagedOutputView(Collections.singletonList(memorySegment)));
new PreAllocatedPagedOutputView(Collections.singletonList(memorySegment)),
System.currentTimeMillis());
}

private void assertDefaultLogRecordBatchEquals(LogRecordBatch recordBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ private KvWriteBatch createKvWriteBatch(
KvFormat.COMPACTED,
writeLimit,
outputView,
null);
null,
System.currentTimeMillis());
}

private WriteCallback newWriteCallback() {
Expand Down
Loading

0 comments on commit 8083e64

Please sign in to comment.