@@ -72,8 +72,8 @@ use object_store::path::Path;
7272use object_store:: { ObjectMeta , ObjectStore } ;
7373use parquet:: arrow:: arrow_reader:: statistics:: StatisticsConverter ;
7474use parquet:: arrow:: arrow_writer:: {
75- compute_leaves, get_column_writers , ArrowColumnChunk , ArrowColumnWriter ,
76- ArrowLeafColumn , ArrowWriterOptions ,
75+ compute_leaves, ArrowColumnChunk , ArrowColumnWriter , ArrowLeafColumn ,
76+ ArrowRowGroupWriterFactory , ArrowWriterOptions ,
7777} ;
7878use parquet:: arrow:: async_reader:: MetadataFetch ;
7979use parquet:: arrow:: { parquet_to_arrow_schema, ArrowSchemaConverter , AsyncArrowWriter } ;
@@ -1306,14 +1306,6 @@ impl FileSink for ParquetSink {
13061306 object_store : Arc < dyn ObjectStore > ,
13071307 ) -> Result < u64 > {
13081308 let parquet_opts = & self . parquet_options ;
1309- let mut allow_single_file_parallelism =
1310- parquet_opts. global . allow_single_file_parallelism ;
1311-
1312- if parquet_opts. crypto . file_encryption . is_some ( ) {
1313- // For now, arrow-rs does not support parallel writes with encryption
1314- // See https://github.com/apache/arrow-rs/issues/7359
1315- allow_single_file_parallelism = false ;
1316- }
13171309
13181310 let mut file_write_tasks: JoinSet <
13191311 std:: result:: Result < ( Path , FileMetaData ) , DataFusionError > ,
@@ -1330,7 +1322,7 @@ impl FileSink for ParquetSink {
13301322 } ;
13311323
13321324 while let Some ( ( path, mut rx) ) = file_stream_rx. recv ( ) . await {
1333- if !allow_single_file_parallelism {
1325+ if !parquet_opts . global . allow_single_file_parallelism {
13341326 let mut writer = self
13351327 . create_async_arrow_writer (
13361328 & path,
@@ -1458,13 +1450,13 @@ type ColSender = Sender<ArrowLeafColumn>;
14581450/// Returns join handles for each columns serialization task along with a send channel
14591451/// to send arrow arrays to each serialization task.
14601452fn spawn_column_parallel_row_group_writer (
1461- schema : Arc < Schema > ,
1462- parquet_props : Arc < WriterProperties > ,
1453+ arrow_row_group_writer_factory : Arc < ArrowRowGroupWriterFactory > ,
14631454 max_buffer_size : usize ,
14641455 pool : & Arc < dyn MemoryPool > ,
14651456) -> Result < ( Vec < ColumnWriterTask > , Vec < ColSender > ) > {
1466- let schema_desc = ArrowSchemaConverter :: new ( ) . convert ( & schema) ?;
1467- let col_writers = get_column_writers ( & schema_desc, & parquet_props, & schema) ?;
1457+ let arrow_row_group_writer =
1458+ arrow_row_group_writer_factory. create_row_group_writer ( 0 ) ?;
1459+ let col_writers = arrow_row_group_writer. into_column_writers ( ) ;
14681460 let num_columns = col_writers. len ( ) ;
14691461
14701462 let mut col_writer_tasks = Vec :: with_capacity ( num_columns) ;
@@ -1559,6 +1551,7 @@ fn spawn_rg_join_and_finalize_task(
15591551/// across both columns and row_groups, with a theoretical max number of parallel tasks
15601552/// given by n_columns * num_row_groups.
15611553fn spawn_parquet_parallel_serialization_task (
1554+ arrow_row_group_writer_factory : Arc < ArrowRowGroupWriterFactory > ,
15621555 mut data : Receiver < RecordBatch > ,
15631556 serialize_tx : Sender < SpawnedTask < RBStreamSerializeResult > > ,
15641557 schema : Arc < Schema > ,
@@ -1571,12 +1564,14 @@ fn spawn_parquet_parallel_serialization_task(
15711564 let max_row_group_rows = writer_props. max_row_group_size ( ) ;
15721565 let ( mut column_writer_handles, mut col_array_channels) =
15731566 spawn_column_parallel_row_group_writer (
1574- Arc :: clone ( & schema) ,
1575- Arc :: clone ( & writer_props) ,
1567+ arrow_row_group_writer_factory. clone ( ) ,
15761568 max_buffer_rb,
15771569 & pool,
15781570 ) ?;
15791571 let mut current_rg_rows = 0 ;
1572+ // TODO: row_group_writer should use the correct row group index. Currently this would fail if
1573+ // multiple row groups were written.
1574+ // let mut rg_index = 0;
15801575
15811576 while let Some ( mut rb) = data. recv ( ) . await {
15821577 // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
@@ -1623,8 +1618,7 @@ fn spawn_parquet_parallel_serialization_task(
16231618
16241619 ( column_writer_handles, col_array_channels) =
16251620 spawn_column_parallel_row_group_writer (
1626- Arc :: clone ( & schema) ,
1627- Arc :: clone ( & writer_props) ,
1621+ arrow_row_group_writer_factory. clone ( ) ,
16281622 max_buffer_rb,
16291623 & pool,
16301624 ) ?;
@@ -1655,24 +1649,15 @@ fn spawn_parquet_parallel_serialization_task(
16551649/// Consume RowGroups serialized by other parallel tasks and concatenate them in
16561650/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
16571651async fn concatenate_parallel_row_groups (
1652+ mut parquet_writer : SerializedFileWriter < SharedBuffer > ,
1653+ merged_buff : SharedBuffer ,
16581654 mut serialize_rx : Receiver < SpawnedTask < RBStreamSerializeResult > > ,
1659- schema : Arc < Schema > ,
1660- writer_props : Arc < WriterProperties > ,
16611655 mut object_store_writer : Box < dyn AsyncWrite + Send + Unpin > ,
16621656 pool : Arc < dyn MemoryPool > ,
16631657) -> Result < FileMetaData > {
1664- let merged_buff = SharedBuffer :: new ( INITIAL_BUFFER_BYTES ) ;
1665-
16661658 let mut file_reservation =
16671659 MemoryConsumer :: new ( "ParquetSink(SerializedFileWriter)" ) . register ( & pool) ;
16681660
1669- let schema_desc = ArrowSchemaConverter :: new ( ) . convert ( schema. as_ref ( ) ) ?;
1670- let mut parquet_writer = SerializedFileWriter :: new (
1671- merged_buff. clone ( ) ,
1672- schema_desc. root_schema_ptr ( ) ,
1673- writer_props,
1674- ) ?;
1675-
16761661 while let Some ( task) = serialize_rx. recv ( ) . await {
16771662 let result = task. join_unwind ( ) . await ;
16781663 let mut rg_out = parquet_writer. next_row_group ( ) ?;
@@ -1723,28 +1708,47 @@ async fn output_single_parquet_file_parallelized(
17231708 let ( serialize_tx, serialize_rx) =
17241709 mpsc:: channel :: < SpawnedTask < RBStreamSerializeResult > > ( max_rowgroups) ;
17251710
1711+ let parquet_schema = ArrowSchemaConverter :: new ( )
1712+ . with_coerce_types ( parquet_props. coerce_types ( ) )
1713+ . convert ( & output_schema) ?;
1714+ let merged_buff = SharedBuffer :: new ( INITIAL_BUFFER_BYTES ) ;
1715+ let parquet_writer = SerializedFileWriter :: new (
1716+ merged_buff. clone ( ) ,
1717+ parquet_schema. root_schema_ptr ( ) ,
1718+ parquet_props. clone ( ) . into ( ) ,
1719+ ) ?;
1720+ let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory :: new (
1721+ & parquet_writer,
1722+ parquet_schema,
1723+ output_schema. clone ( ) ,
1724+ parquet_props. clone ( ) . into ( ) ,
1725+ ) ;
1726+
17261727 let arc_props = Arc :: new ( parquet_props. clone ( ) ) ;
17271728 let launch_serialization_task = spawn_parquet_parallel_serialization_task (
1729+ Arc :: new ( arrow_row_group_writer_factory) ,
17281730 data,
17291731 serialize_tx,
17301732 Arc :: clone ( & output_schema) ,
17311733 Arc :: clone ( & arc_props) ,
17321734 parallel_options,
17331735 Arc :: clone ( & pool) ,
17341736 ) ;
1737+
1738+ launch_serialization_task
1739+ . join_unwind ( )
1740+ . await
1741+ . map_err ( |e| DataFusionError :: ExecutionJoin ( Box :: new ( e) ) ) ??;
1742+
17351743 let file_metadata = concatenate_parallel_row_groups (
1744+ parquet_writer,
1745+ merged_buff,
17361746 serialize_rx,
1737- Arc :: clone ( & output_schema) ,
1738- Arc :: clone ( & arc_props) ,
17391747 object_store_writer,
17401748 pool,
17411749 )
17421750 . await ?;
17431751
1744- launch_serialization_task
1745- . join_unwind ( )
1746- . await
1747- . map_err ( |e| DataFusionError :: ExecutionJoin ( Box :: new ( e) ) ) ??;
17481752 Ok ( file_metadata)
17491753}
17501754
0 commit comments