Skip to content
20 changes: 19 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}

/// Create a new row group writer and return its column writers.
#[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to myself, these APIs were added in

pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
self.flush()?;
let in_progress = self
Expand All @@ -418,6 +419,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}

/// Append the given column chunks to the file as a new row group.
#[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
let mut row_group_writer = self.writer.next_row_group()?;
for chunk in chunks {
Expand All @@ -426,6 +428,15 @@ impl<W: Write + Send> ArrowWriter<W> {
row_group_writer.close()?;
Ok(())
}

/// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent quite a while trying to figure out why we can't just use get_column_writers as in the example to use ArrowRowGroupWriterFactory and I (finally) realized the reason is the encryption configuration isn't passed to get_column_writers. ArrowRowGroupWriterFactory does have the encryption details and thus can make the correct ArrowColumnWriters.

I think it is somewhat of a strange API to create an ArrowWriter only to immediately destructure it into a SerializedWriter / the underlying writer. It is also unfortunate we now have two different APIs for writing row groups in parallel, depending on encryption.

I have an idea to make the APIs better as a follow on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed it's odd. Motivation was to introduce as few new pubs as possible. Would be very curious about alternative API shapes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#8162 (review) is my suggestion , basically TLDR is to make ArrowRowGroupWriterFactory constructor public and deprecate get_column_writers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent quite a while trying to figure out why we can't just use get_column_writers.

Sorry, I realise now that I didn't make this very clear in my original issue (#7359).

One other factor is that we couldn't just use the WriterProperties passed to get_column_writers to internally create a new FileEncryptor. When a FileEncryptor is created for the SerializedFileWriter, it generates random AAD (additional authentication data), and this AAD has to be the same for all encrypted modules in the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a ticket with a suggestion for a unified API:

/// This can be useful to provide more control over how files are written.
pub fn into_serialized_writer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent quite some time trying to figure out why this API is needed -- specifically "why do we need an ArrowWriter at all, why not use SerializedFileWriter and get_column_writers directly, as shown in this example

After study I concluded the reason we need to expose ArrowRowGroupWriterFactory is that ArrowRowGroupWriterFactory::create_column_writers also has the appropriate encryption properties.

It is unfortunate that we'll now have two different sets of APIs for creating column writers -- via get_column_writers AND ArrowRowGroupWriterFactory

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a ticket with a suggestion for a unified API:

mut self,
) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
self.flush()?;
Ok((self.writer, self.row_group_writer_factory))
}
}

impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
Expand Down Expand Up @@ -851,7 +862,8 @@ impl ArrowRowGroupWriter {
}
}

struct ArrowRowGroupWriterFactory {
/// Factory that creates new column writers for each row group in the Parquet file.
pub struct ArrowRowGroupWriterFactory {
schema: SchemaDescriptor,
arrow_schema: SchemaRef,
props: WriterPropertiesPtr,
Expand Down Expand Up @@ -906,6 +918,12 @@ impl ArrowRowGroupWriterFactory {
let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
}

/// Create column writers for a new row group.
pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now that this is the key API -- create column writers with the relevant encryption properties, if relevant

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you thinks to set create_row_group_writer pub and remove this function and use function into_writers add in issue #8260

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am not sure making ArrowRowGroupWriter public gets us much of anything, and it would not allow per-column parallel encoding

One benefit of getting the column writers individually, is that then the columns can be encoded in parallel. The ArrowRowGroupWriter can only write RowGroups in parallel.

I looked at ArrowRowGroupWriter a bit more, and the only substantial thing it does is call a loop with compute_leaves which is already public.

struct ArrowRowGroupWriter {
writers: Vec<ArrowColumnWriter>,
schema: SchemaRef,
buffered_rows: usize,
}

let rg_writer = self.create_row_group_writer(row_group_index)?;
Ok(rg_writer.writers)
}
}

/// Returns the [`ArrowColumnWriter`] for a given schema
Expand Down
67 changes: 2 additions & 65 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ mod store;
pub use store::*;

use crate::{
arrow::arrow_writer::{ArrowColumnChunk, ArrowColumnWriter, ArrowWriterOptions},
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::{metadata::RowGroupMetaData, properties::WriterProperties},
Expand Down Expand Up @@ -288,34 +288,16 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {

Ok(())
}

/// Create a new row group writer and return its column writers.
pub async fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked and this code is not yet released, so this is not a public API change. It was added in

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very fresh indeed. I hope we're not spoiling some plan here. We did find the ArrowRowGroupWriterFactory route better so I think it should be ok.

let before = self.sync_writer.flushed_row_groups().len();
let writers = self.sync_writer.get_column_writers()?;
if before != self.sync_writer.flushed_row_groups().len() {
self.do_write().await?;
}
Ok(writers)
}

/// Append the given column chunks to the file as a new row group.
pub async fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
self.sync_writer.append_row_group(chunks)?;
self.do_write().await
}
}

#[cfg(test)]
mod tests {
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
use bytes::Bytes;
use std::sync::Arc;

use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use crate::arrow::arrow_writer::compute_leaves;

use super::*;

fn get_test_reader() -> ParquetRecordBatchReader {
Expand Down Expand Up @@ -349,51 +331,6 @@ mod tests {
assert_eq!(to_write, read);
}

#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we removed those APIs in this PR, we should also remove the test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

async fn test_async_arrow_group_writer() {
let col = Arc::new(Int64Array::from_iter_values([4, 5, 6])) as ArrayRef;
let to_write_record = RecordBatch::try_from_iter([("col", col)]).unwrap();

let mut buffer = Vec::new();
let mut writer =
AsyncArrowWriter::try_new(&mut buffer, to_write_record.schema(), None).unwrap();

// Use classic API
writer.write(&to_write_record).await.unwrap();

let mut writers = writer.get_column_writers().await.unwrap();
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write_arrow_group = RecordBatch::try_from_iter([("col", col)]).unwrap();

for (field, column) in to_write_arrow_group
.schema()
.fields()
.iter()
.zip(to_write_arrow_group.columns())
{
for leaf in compute_leaves(field.as_ref(), column).unwrap() {
writers[0].write(&leaf).unwrap();
}
}

let columns: Vec<_> = writers.into_iter().map(|w| w.close().unwrap()).collect();
// Append the arrow group as a new row group. Flush in progress
writer.append_row_group(columns).await.unwrap();
writer.close().await.unwrap();

let buffer = Bytes::from(buffer);
let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
.unwrap()
.build()
.unwrap();

let col = Arc::new(Int64Array::from_iter_values([4, 5, 6, 1, 2, 3])) as ArrayRef;
let expected = RecordBatch::try_from_iter([("col", col)]).unwrap();

let read = reader.next().unwrap().unwrap();
assert_eq!(expected, read);
}

// Read the data from the test file and write it by the async writer and sync writer.
// And then compares the results of the two writers.
#[tokio::test]
Expand Down
Loading
Loading