Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 44 additions & 17 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ impl<W: Write + Send> ArrowWriter<W> {
/// This estimate is formed bu summing the values of
/// [`ArrowColumnWriter::memory_size`] all in progress columns.
pub fn memory_size(&self) -> usize {
match &self.in_progress {
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
None => 0,
}
self.in_progress
.as_ref()
.map(ArrowRowGroupWriter::memory_size)
.unwrap_or_default()
}

/// Anticipated encoded size of the in progress row group.
Expand All @@ -280,21 +280,17 @@ impl<W: Write + Send> ArrowWriter<W> {
/// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
/// columns.
pub fn in_progress_size(&self) -> usize {
match &self.in_progress {
Some(in_progress) => in_progress
.writers
.iter()
.map(|x| x.get_estimated_total_bytes())
.sum(),
None => 0,
}
self.in_progress
.as_ref()
.map(ArrowRowGroupWriter::get_estimated_total_bytes)
.unwrap_or_default()
}

/// Returns the number of rows buffered in the in progress row group
pub fn in_progress_rows(&self) -> usize {
self.in_progress
.as_ref()
.map(|x| x.buffered_rows)
.map(ArrowRowGroupWriter::rows_count)
.unwrap_or_default()
}

Expand Down Expand Up @@ -817,22 +813,26 @@ impl ArrowColumnWriter {
}

/// Encodes [`RecordBatch`] to a parquet row group
struct ArrowRowGroupWriter {
pub struct ArrowRowGroupWriter {
writers: Vec<ArrowColumnWriter>,
schema: SchemaRef,
buffered_rows: usize,
}

impl ArrowRowGroupWriter {
fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
/// Creates a new [`ArrowRowGroupWriter`]
pub fn new(writers: Vec<ArrowColumnWriter>, arrow: &SchemaRef) -> Self {
Self {
writers,
schema: arrow.clone(),
buffered_rows: 0,
}
}

fn write(&mut self, batch: &RecordBatch) -> Result<()> {
/// Encodes the provided [`RecordBatch`]
///
/// This will fail if the `batch`'s schema does not match the writer's schema.
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.buffered_rows += batch.num_rows();
let mut writers = self.writers.iter_mut();
for (field, column) in self.schema.fields().iter().zip(batch.columns()) {
Expand All @@ -843,7 +843,34 @@ impl ArrowRowGroupWriter {
Ok(())
}

fn close(self) -> Result<Vec<ArrowColumnChunk>> {
/// Estimated memory usage, in bytes, of this `ArrowRowGroupWriter`
///
/// This estimate is formed bu summing the values of
/// [`ArrowColumnWriter::memory_size`] all in progress columns.
pub fn memory_size(&self) -> usize {
self.writers.iter().map(|x| x.memory_size()).sum()
}

/// Anticipated encoded size of the in progress row group.
///
/// This estimate the row group size after being completely encoded is,
/// formed by summing the values of
/// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
/// columns.
pub fn get_estimated_total_bytes(&self) -> usize {
self.writers
.iter()
.map(|x| x.get_estimated_total_bytes())
.sum()
}

/// Returns the number of rows buffered
pub fn rows_count(&self) -> usize {
self.buffered_rows
}

/// Close this row group returning the list of written [`ArrowColumnChunk`]
pub fn close(self) -> Result<Vec<ArrowColumnChunk>> {
self.writers
.into_iter()
.map(|writer| writer.close())
Expand Down
Loading