@@ -76,8 +76,8 @@ use object_store::path::Path;
76
76
use object_store:: { ObjectMeta , ObjectStore } ;
77
77
use parquet:: arrow:: arrow_reader:: statistics:: StatisticsConverter ;
78
78
use parquet:: arrow:: arrow_writer:: {
79
- compute_leaves, get_column_writers , ArrowColumnChunk , ArrowColumnWriter ,
80
- ArrowLeafColumn , ArrowWriterOptions ,
79
+ compute_leaves, ArrowColumnChunk , ArrowColumnWriter , ArrowLeafColumn ,
80
+ ArrowRowGroupWriterFactory , ArrowWriterOptions ,
81
81
} ;
82
82
use parquet:: arrow:: async_reader:: MetadataFetch ;
83
83
use parquet:: arrow:: { parquet_to_arrow_schema, ArrowSchemaConverter , AsyncArrowWriter } ;
@@ -1319,14 +1319,6 @@ impl FileSink for ParquetSink {
1319
1319
object_store : Arc < dyn ObjectStore > ,
1320
1320
) -> Result < u64 > {
1321
1321
let parquet_opts = & self . parquet_options ;
1322
- let mut allow_single_file_parallelism =
1323
- parquet_opts. global . allow_single_file_parallelism ;
1324
-
1325
- if parquet_opts. crypto . file_encryption . is_some ( ) {
1326
- // For now, arrow-rs does not support parallel writes with encryption
1327
- // See https://github.com/apache/arrow-rs/issues/7359
1328
- allow_single_file_parallelism = false ;
1329
- }
1330
1322
1331
1323
let mut file_write_tasks: JoinSet <
1332
1324
std:: result:: Result < ( Path , FileMetaData ) , DataFusionError > ,
@@ -1343,7 +1335,7 @@ impl FileSink for ParquetSink {
1343
1335
} ;
1344
1336
1345
1337
while let Some ( ( path, mut rx) ) = file_stream_rx. recv ( ) . await {
1346
- if !allow_single_file_parallelism {
1338
+ if !parquet_opts . global . allow_single_file_parallelism {
1347
1339
let mut writer = self
1348
1340
. create_async_arrow_writer (
1349
1341
& path,
@@ -1471,13 +1463,13 @@ type ColSender = Sender<ArrowLeafColumn>;
1471
1463
/// Returns join handles for each columns serialization task along with a send channel
1472
1464
/// to send arrow arrays to each serialization task.
1473
1465
fn spawn_column_parallel_row_group_writer (
1474
- schema : Arc < Schema > ,
1475
- parquet_props : Arc < WriterProperties > ,
1466
+ arrow_row_group_writer_factory : Arc < ArrowRowGroupWriterFactory > ,
1476
1467
max_buffer_size : usize ,
1477
1468
pool : & Arc < dyn MemoryPool > ,
1478
1469
) -> Result < ( Vec < ColumnWriterTask > , Vec < ColSender > ) > {
1479
- let schema_desc = ArrowSchemaConverter :: new ( ) . convert ( & schema) ?;
1480
- let col_writers = get_column_writers ( & schema_desc, & parquet_props, & schema) ?;
1470
+ let arrow_row_group_writer =
1471
+ arrow_row_group_writer_factory. create_row_group_writer ( 0 ) ?;
1472
+ let col_writers = arrow_row_group_writer. into_column_writers ( ) ;
1481
1473
let num_columns = col_writers. len ( ) ;
1482
1474
1483
1475
let mut col_writer_tasks = Vec :: with_capacity ( num_columns) ;
@@ -1572,6 +1564,7 @@ fn spawn_rg_join_and_finalize_task(
1572
1564
/// across both columns and row_groups, with a theoretical max number of parallel tasks
1573
1565
/// given by n_columns * num_row_groups.
1574
1566
fn spawn_parquet_parallel_serialization_task (
1567
+ arrow_row_group_writer_factory : Arc < ArrowRowGroupWriterFactory > ,
1575
1568
mut data : Receiver < RecordBatch > ,
1576
1569
serialize_tx : Sender < SpawnedTask < RBStreamSerializeResult > > ,
1577
1570
schema : Arc < Schema > ,
@@ -1584,12 +1577,14 @@ fn spawn_parquet_parallel_serialization_task(
1584
1577
let max_row_group_rows = writer_props. max_row_group_size ( ) ;
1585
1578
let ( mut column_writer_handles, mut col_array_channels) =
1586
1579
spawn_column_parallel_row_group_writer (
1587
- Arc :: clone ( & schema) ,
1588
- Arc :: clone ( & writer_props) ,
1580
+ Arc :: clone ( & arrow_row_group_writer_factory) ,
1589
1581
max_buffer_rb,
1590
1582
& pool,
1591
1583
) ?;
1592
1584
let mut current_rg_rows = 0 ;
1585
+ // TODO: row_group_writer should use the correct row group index. Currently this would fail if
1586
+ // multiple row groups were written.
1587
+ // let mut rg_index = 0;
1593
1588
1594
1589
while let Some ( mut rb) = data. recv ( ) . await {
1595
1590
// This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
@@ -1636,8 +1631,7 @@ fn spawn_parquet_parallel_serialization_task(
1636
1631
1637
1632
( column_writer_handles, col_array_channels) =
1638
1633
spawn_column_parallel_row_group_writer (
1639
- Arc :: clone ( & schema) ,
1640
- Arc :: clone ( & writer_props) ,
1634
+ Arc :: clone ( & arrow_row_group_writer_factory) ,
1641
1635
max_buffer_rb,
1642
1636
& pool,
1643
1637
) ?;
@@ -1668,24 +1662,15 @@ fn spawn_parquet_parallel_serialization_task(
1668
1662
/// Consume RowGroups serialized by other parallel tasks and concatenate them in
1669
1663
/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
1670
1664
async fn concatenate_parallel_row_groups (
1665
+ mut parquet_writer : SerializedFileWriter < SharedBuffer > ,
1666
+ merged_buff : SharedBuffer ,
1671
1667
mut serialize_rx : Receiver < SpawnedTask < RBStreamSerializeResult > > ,
1672
- schema : Arc < Schema > ,
1673
- writer_props : Arc < WriterProperties > ,
1674
1668
mut object_store_writer : Box < dyn AsyncWrite + Send + Unpin > ,
1675
1669
pool : Arc < dyn MemoryPool > ,
1676
1670
) -> Result < FileMetaData > {
1677
- let merged_buff = SharedBuffer :: new ( INITIAL_BUFFER_BYTES ) ;
1678
-
1679
1671
let mut file_reservation =
1680
1672
MemoryConsumer :: new ( "ParquetSink(SerializedFileWriter)" ) . register ( & pool) ;
1681
1673
1682
- let schema_desc = ArrowSchemaConverter :: new ( ) . convert ( schema. as_ref ( ) ) ?;
1683
- let mut parquet_writer = SerializedFileWriter :: new (
1684
- merged_buff. clone ( ) ,
1685
- schema_desc. root_schema_ptr ( ) ,
1686
- writer_props,
1687
- ) ?;
1688
-
1689
1674
while let Some ( task) = serialize_rx. recv ( ) . await {
1690
1675
let result = task. join_unwind ( ) . await ;
1691
1676
let mut rg_out = parquet_writer. next_row_group ( ) ?;
@@ -1736,28 +1721,47 @@ async fn output_single_parquet_file_parallelized(
1736
1721
let ( serialize_tx, serialize_rx) =
1737
1722
mpsc:: channel :: < SpawnedTask < RBStreamSerializeResult > > ( max_rowgroups) ;
1738
1723
1724
+ let parquet_schema = ArrowSchemaConverter :: new ( )
1725
+ . with_coerce_types ( parquet_props. coerce_types ( ) )
1726
+ . convert ( & output_schema) ?;
1727
+ let merged_buff = SharedBuffer :: new ( INITIAL_BUFFER_BYTES ) ;
1728
+ let parquet_writer = SerializedFileWriter :: new (
1729
+ merged_buff. clone ( ) ,
1730
+ parquet_schema. root_schema_ptr ( ) ,
1731
+ parquet_props. clone ( ) . into ( ) ,
1732
+ ) ?;
1733
+ let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory :: new (
1734
+ & parquet_writer,
1735
+ parquet_schema,
1736
+ Arc :: clone ( & output_schema) ,
1737
+ parquet_props. clone ( ) . into ( ) ,
1738
+ ) ;
1739
+
1739
1740
let arc_props = Arc :: new ( parquet_props. clone ( ) ) ;
1740
1741
let launch_serialization_task = spawn_parquet_parallel_serialization_task (
1742
+ Arc :: new ( arrow_row_group_writer_factory) ,
1741
1743
data,
1742
1744
serialize_tx,
1743
1745
Arc :: clone ( & output_schema) ,
1744
1746
Arc :: clone ( & arc_props) ,
1745
1747
parallel_options,
1746
1748
Arc :: clone ( & pool) ,
1747
1749
) ;
1750
+
1751
+ launch_serialization_task
1752
+ . join_unwind ( )
1753
+ . await
1754
+ . map_err ( |e| DataFusionError :: ExecutionJoin ( Box :: new ( e) ) ) ??;
1755
+
1748
1756
let file_metadata = concatenate_parallel_row_groups (
1757
+ parquet_writer,
1758
+ merged_buff,
1749
1759
serialize_rx,
1750
- Arc :: clone ( & output_schema) ,
1751
- Arc :: clone ( & arc_props) ,
1752
1760
object_store_writer,
1753
1761
pool,
1754
1762
)
1755
1763
. await ?;
1756
1764
1757
- launch_serialization_task
1758
- . join_unwind ( )
1759
- . await
1760
- . map_err ( |e| DataFusionError :: ExecutionJoin ( Box :: new ( e) ) ) ??;
1761
1765
Ok ( file_metadata)
1762
1766
}
1763
1767
0 commit comments