@@ -1259,9 +1259,7 @@ impl FileSink for ParquetSink {
1259
1259
1260
1260
while let Some ( ( path, mut rx) ) = file_stream_rx. recv ( ) . await {
1261
1261
let parquet_props = self . create_writer_props ( & runtime, & path) . await ?;
1262
- if !allow_single_file_parallelism {
1263
- // let parquet_props = self.create_writer_props(&runtime, &path)?;
1264
- // if !parquet_opts.global.allow_single_file_parallelism {
1262
+ if !parquet_opts. global . allow_single_file_parallelism {
1265
1263
let mut writer = self
1266
1264
. create_async_arrow_writer (
1267
1265
& path,
@@ -1603,8 +1601,8 @@ async fn concatenate_parallel_row_groups(
1603
1601
let mut rg_out = parquet_writer. next_row_group ( ) ?;
1604
1602
for chunk in serialized_columns {
1605
1603
chunk. append_to_row_group ( & mut rg_out) ?;
1606
-
1607
1604
rg_reservation. free ( ) ;
1605
+
1608
1606
let mut buff_to_flush = merged_buff. buffer . try_lock ( ) . unwrap ( ) ;
1609
1607
file_reservation. try_resize ( buff_to_flush. len ( ) ) ?;
1610
1608
@@ -1637,7 +1635,7 @@ async fn output_single_parquet_file_parallelized(
1637
1635
object_store_writer : Box < dyn AsyncWrite + Send + Unpin > ,
1638
1636
data : Receiver < RecordBatch > ,
1639
1637
output_schema : Arc < Schema > ,
1640
- writer_properties : & WriterProperties ,
1638
+ parquet_props : & WriterProperties ,
1641
1639
skip_arrow_metadata : bool ,
1642
1640
parallel_options : ParallelParquetWriterOptions ,
1643
1641
pool : Arc < dyn MemoryPool > ,
@@ -1647,9 +1645,10 @@ async fn output_single_parquet_file_parallelized(
1647
1645
let ( serialize_tx, serialize_rx) =
1648
1646
mpsc:: channel :: < SpawnedTask < RBStreamSerializeResult > > ( max_rowgroups) ;
1649
1647
1648
+ let arc_props = Arc :: new ( parquet_props. clone ( ) ) ;
1650
1649
let merged_buff = SharedBuffer :: new ( INITIAL_BUFFER_BYTES ) ;
1651
1650
let options = ArrowWriterOptions :: new ( )
1652
- . with_properties ( writer_properties . clone ( ) )
1651
+ . with_properties ( parquet_props . clone ( ) )
1653
1652
. with_skip_arrow_metadata ( skip_arrow_metadata) ;
1654
1653
let writer = ArrowWriter :: try_new_with_options (
1655
1654
merged_buff. clone ( ) ,
@@ -1663,11 +1662,10 @@ async fn output_single_parquet_file_parallelized(
1663
1662
data,
1664
1663
serialize_tx,
1665
1664
Arc :: clone ( & output_schema) ,
1666
- Arc :: new ( writer_properties . clone ( ) ) ,
1665
+ Arc :: clone ( & arc_props ) ,
1667
1666
parallel_options,
1668
1667
Arc :: clone ( & pool) ,
1669
1668
) ;
1670
-
1671
1669
let file_metadata = concatenate_parallel_row_groups (
1672
1670
writer,
1673
1671
merged_buff,
@@ -1681,7 +1679,6 @@ async fn output_single_parquet_file_parallelized(
1681
1679
. join_unwind ( )
1682
1680
. await
1683
1681
. map_err ( |e| DataFusionError :: ExecutionJoin ( Box :: new ( e) ) ) ??;
1684
-
1685
1682
Ok ( file_metadata)
1686
1683
}
1687
1684
0 commit comments