Skip to content

Commit

Permalink
Minor: name some constant values in arrow writer
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 24, 2023
1 parent d5704f7 commit 237e20a
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ impl DisplayAs for ArrowFileSink {
}
}

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
const INITIAL_BUFFER_BYTES: usize = 1048576;

/// If the buffered Arrow data exceeds this size, it is flushed to object store
const BUFFER_FLUSH_BYTES: usize = 1024000;

#[async_trait]
impl DataSink for ArrowFileSink {
fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -239,7 +246,7 @@ impl DataSink for ArrowFileSink {
IpcWriteOptions::try_new(64, false, arrow_ipc::MetadataVersion::V5)?
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
while let Some((path, mut rx)) = file_stream_rx.recv().await {
let shared_buffer = SharedBuffer::new(1048576);
let shared_buffer = SharedBuffer::new(INITIAL_BUFFER_BYTES);
let mut arrow_writer = arrow_ipc::writer::FileWriter::try_new_with_options(
shared_buffer.clone(),
&self.get_writer_schema(),
Expand All @@ -257,7 +264,7 @@ impl DataSink for ArrowFileSink {
row_count += batch.num_rows();
arrow_writer.write(&batch)?;
let mut buff_to_flush = shared_buffer.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000 {
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
Expand Down

0 comments on commit 237e20a

Please sign in to comment.