Skip to content

Commit 08d74e7

Browse files
committed
allow for skip_arrow_metadata
1 parent 1551f4c commit 08d74e7

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,7 @@ impl FileSink for ParquetSink {
13131313
.build()?;
13141314
let schema = get_writer_schema(&self.config);
13151315
let props = parquet_props.clone();
1316+
let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
13161317
let parallel_options_clone = parallel_options.clone();
13171318
let pool = Arc::clone(context.memory_pool());
13181319
file_write_tasks.spawn(async move {
@@ -1321,6 +1322,7 @@ impl FileSink for ParquetSink {
13211322
rx,
13221323
schema,
13231324
&props,
1325+
skip_arrow_metadata,
13241326
parallel_options_clone,
13251327
pool,
13261328
)
@@ -1647,7 +1649,8 @@ async fn output_single_parquet_file_parallelized(
16471649
object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
16481650
data: Receiver<RecordBatch>,
16491651
output_schema: Arc<Schema>,
1650-
parquet_props: &WriterProperties,
1652+
writer_properties: &WriterProperties,
1653+
skip_arrow_metadata: bool,
16511654
parallel_options: ParallelParquetWriterOptions,
16521655
pool: Arc<dyn MemoryPool>,
16531656
) -> Result<FileMetaData> {
@@ -1657,20 +1660,22 @@ async fn output_single_parquet_file_parallelized(
16571660
mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
16581661

16591662
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1660-
let writer = ArrowWriter::try_new(
1663+
let options = ArrowWriterOptions::new()
1664+
.with_properties(writer_properties.clone())
1665+
.with_skip_arrow_metadata(skip_arrow_metadata);
1666+
let writer = ArrowWriter::try_new_with_options(
16611667
merged_buff.clone(),
16621668
Arc::clone(&output_schema),
1663-
Some(parquet_props.clone()),
1669+
options,
16641670
)?;
16651671
let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
16661672

1667-
let arc_props = Arc::new(parquet_props.clone());
16681673
let launch_serialization_task = spawn_parquet_parallel_serialization_task(
16691674
row_group_writer_factory,
16701675
data,
16711676
serialize_tx,
16721677
Arc::clone(&output_schema),
1673-
Arc::clone(&arc_props),
1678+
Arc::new(writer_properties.clone()),
16741679
parallel_options,
16751680
Arc::clone(&pool),
16761681
);

0 commit comments

Comments
 (0)