Skip to content

Commit

Permalink
[client] Unify Memory Allocation with MemorySegmentPool for Code Cons…
Browse files Browse the repository at this point in the history
…istency (#290)

- Unified Memory Allocation: Replaced `WriterBufferPool` with `MemorySegmentPool` to ensure code consistency across the project.

- Removed `trySerialize()` from `WriteBatch`: Eliminated the `trySerialize()`` method on `WriteBatch`. The fallback logic for handling memory request timeouts is no longer necessary because all batching memories are pre-allocated before the WriteBatch is created. In the rare case where pre-allocated memory is insufficient, we allocate from the heap directly.

- Removed `serialize()` from `WriteBatch`: Moved the serialization logic from `serialize()` to `build()` in `WriteBatch`. This simplification addresses potential concurrency issues and streamlines the process. The original purpose of serialize() was to release the ArrowWriter as soon as possible, which occurred when `WriteBatch#close()`. Now, this release happens in `WriteBatch#build()`, which is called when sending RPC. This change should not introduce significant problems, as the ArrowWriter is reused.

- Added `buffer_usage_ratio` for `ArrowWriter`: Introduced a `buffer_usage_ratio` for `ArrowWriter` to prevent excessive memory buffer usage. This reduces the likelihood of heap allocation, thereby minimizing GC overhead. If heap allocation does occur, it is still manageable and does not pose a major issue.
  • Loading branch information
wuchong committed Jan 14, 2025
1 parent 71f97f1 commit d0b466f
Show file tree
Hide file tree
Showing 57 changed files with 967 additions and 980 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.memory.AbstractPagedOutputView;
import com.alibaba.fluss.memory.MemorySegment;
import com.alibaba.fluss.memory.MemorySegmentPool;
import com.alibaba.fluss.memory.PreAllocatedManagedPagedOutputView;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.MemoryLogRecordsArrowBuilder;
import com.alibaba.fluss.record.RowKind;
import com.alibaba.fluss.record.bytesview.BytesView;
import com.alibaba.fluss.record.bytesview.MemorySegmentBytesView;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.arrow.ArrowWriter;
import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
Expand All @@ -37,7 +34,6 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

/**
* A batch of log records managed in ARROW format that is or will be sent to server by {@link
Expand All @@ -50,22 +46,17 @@
public class ArrowLogWriteBatch extends WriteBatch {
private final MemoryLogRecordsArrowBuilder recordsBuilder;
private final AbstractPagedOutputView outputView;
private final List<MemorySegment> preAllocatedMemorySegments;

public ArrowLogWriteBatch(
TableBucket tableBucket,
PhysicalTablePath physicalTablePath,
int schemaId,
ArrowWriter arrowWriter,
List<MemorySegment> preAllocatedMemorySegments,
MemorySegmentPool memorySegmentSource) {
AbstractPagedOutputView outputView) {
super(tableBucket, physicalTablePath);
this.outputView =
new PreAllocatedManagedPagedOutputView(
preAllocatedMemorySegments, memorySegmentSource);
this.outputView = outputView;
this.recordsBuilder =
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView);
this.preAllocatedMemorySegments = preAllocatedMemorySegments;
}

@Override
Expand All @@ -88,20 +79,6 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
}
}

@Override
public void serialize() {
try {
recordsBuilder.serialize();
} catch (IOException e) {
throw new FlussRuntimeException("Failed to serialize Arrow batch to memory buffer.", e);
}
}

@Override
public boolean trySerialize() {
return recordsBuilder.trySerialize();
}

@Override
public BytesView build() {
try {
Expand All @@ -128,16 +105,8 @@ public int sizeInBytes() {
}

@Override
public List<MemorySegment> memorySegments() {
List<MemorySegment> usedMemorySegments =
outputView.getSegmentBytesViewList().stream()
.map(MemorySegmentBytesView::getMemorySegment)
.collect(Collectors.toList());
if (usedMemorySegments.size() > preAllocatedMemorySegments.size()) {
return usedMemorySegments;
} else {
return preAllocatedMemorySegments;
}
public List<MemorySegment> pooledMemorySegments() {
return outputView.allocatedPooledSegments();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,21 @@
package com.alibaba.fluss.client.write;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.memory.AbstractPagedOutputView;
import com.alibaba.fluss.memory.MemorySegment;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.MemoryLogRecords;
import com.alibaba.fluss.record.MemoryLogRecordsIndexedBuilder;
import com.alibaba.fluss.record.RowKind;
import com.alibaba.fluss.record.bytesview.BytesView;
import com.alibaba.fluss.record.bytesview.MemorySegmentBytesView;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
import com.alibaba.fluss.utils.Preconditions;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -46,14 +43,19 @@
@NotThreadSafe
@Internal
public final class IndexedLogWriteBatch extends WriteBatch {
private final AbstractPagedOutputView outputView;
private final MemoryLogRecordsIndexedBuilder recordsBuilder;

public IndexedLogWriteBatch(
TableBucket tableBucket,
PhysicalTablePath physicalTablePath,
MemoryLogRecordsIndexedBuilder recordsBuilder) {
int schemaId,
int writeLimit,
AbstractPagedOutputView outputView) {
super(tableBucket, physicalTablePath);
this.recordsBuilder = recordsBuilder;
this.outputView = outputView;
this.recordsBuilder =
MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, outputView);
}

@Override
Expand All @@ -77,34 +79,14 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
}

@Override
public void serialize() {
// do nothing, records are serialized into memory buffer when appending
}

@Override
public boolean trySerialize() {
// records have been serialized.
return true;
}

@VisibleForTesting
public MemoryLogRecords records() {
public BytesView build() {
try {
return recordsBuilder.build();
} catch (IOException e) {
throw new FlussRuntimeException("build memory log records failed", e);
throw new FlussRuntimeException("Failed to build indexed log record batch.", e);
}
}

@Override
public BytesView build() {
MemoryLogRecords memoryLogRecords = records();
return new MemorySegmentBytesView(
memoryLogRecords.getMemorySegment(),
memoryLogRecords.getPosition(),
memoryLogRecords.sizeInBytes());
}

@Override
public boolean isClosed() {
return recordsBuilder.isClosed();
Expand All @@ -117,8 +99,8 @@ public void close() throws Exception {
}

@Override
public List<MemorySegment> memorySegments() {
return Collections.singletonList(recordsBuilder.getMemorySegment());
public List<MemorySegment> pooledMemorySegments() {
return outputView.allocatedPooledSegments();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package com.alibaba.fluss.client.write;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.memory.AbstractPagedOutputView;
import com.alibaba.fluss.memory.MemorySegment;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.DefaultKvRecordBatch;
import com.alibaba.fluss.record.KvRecordBatchBuilder;
import com.alibaba.fluss.record.bytesview.BytesView;
import com.alibaba.fluss.record.bytesview.MemorySegmentBytesView;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.rpc.messages.PutKvRequest;
import com.alibaba.fluss.utils.Preconditions;
Expand All @@ -34,7 +34,6 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -45,23 +44,30 @@
@NotThreadSafe
@Internal
public class KvWriteBatch extends WriteBatch {
private final DefaultKvRecordBatch.Builder recordsBuilder;
private final AbstractPagedOutputView outputView;
private final KvRecordBatchBuilder recordsBuilder;
private final @Nullable int[] targetColumns;

public KvWriteBatch(
TableBucket tableBucket,
PhysicalTablePath physicalTablePath,
DefaultKvRecordBatch.Builder recordsBuilder,
int[] targetColumns) {
int schemaId,
KvFormat kvFormat,
int writeLimit,
AbstractPagedOutputView outputView,
@Nullable int[] targetColumns) {
super(tableBucket, physicalTablePath);
this.recordsBuilder = recordsBuilder;
this.outputView = outputView;
this.recordsBuilder =
KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat);
this.targetColumns = targetColumns;
}

@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
// currently, we throw exception directly when the target columns of the write record is
// not the same as the current target columns in the batch
// not the same as the current target columns in the batch.
// this should be quite fast as they should be the same objects.
if (!Arrays.equals(targetColumns, writeRecord.getTargetColumns())) {
throw new IllegalStateException(
String.format(
Expand All @@ -84,38 +90,18 @@ public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws
}
}

@Override
public void serialize() {
// do nothing, records are serialized into memory buffer when appending
}

@Override
public boolean trySerialize() {
// records have been serialized.
return true;
}

@VisibleForTesting
public DefaultKvRecordBatch records() {
try {
return recordsBuilder.build();
} catch (IOException e) {
throw new FlussRuntimeException("Failed to build record batch.", e);
}
}

@Nullable
public int[] getTargetColumns() {
return targetColumns;
}

@Override
public BytesView build() {
DefaultKvRecordBatch recordBatch = records();
return new MemorySegmentBytesView(
recordBatch.getMemorySegment(),
recordBatch.getPosition(),
recordBatch.sizeInBytes());
try {
return recordsBuilder.build();
} catch (IOException e) {
throw new FlussRuntimeException("Failed to build kv record batch.", e);
}
}

@Override
Expand All @@ -135,8 +121,8 @@ public int sizeInBytes() {
}

@Override
public List<MemorySegment> memorySegments() {
return Collections.singletonList(recordsBuilder.getMemorySegment());
public List<MemorySegment> pooledMemorySegments() {
return outputView.allocatedPooledSegments();
}

@Override
Expand Down
Loading

0 comments on commit d0b466f

Please sign in to comment.