Skip to content

Commit 15c5b1a

Browse files
committed
Rebase fixes, switch to ArrowWriter approach
1 parent 7b6d5d0 commit 15c5b1a

File tree

14 files changed

+234
-392
lines changed

14 files changed

+234
-392
lines changed

Cargo.lock

Lines changed: 194 additions & 319 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,20 @@ ahash = { version = "0.8", default-features = false, features = [
8989
"runtime-rng",
9090
] }
9191
apache-avro = { version = "0.17", default-features = false }
92-
arrow = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", features = [
92+
arrow = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing_2", features = [
9393
"prettyprint",
9494
"chrono-tz",
9595
] }
9696

97-
arrow-buffer = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false }
97+
arrow-buffer = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing_2", default-features = false }
9898
arrow-flight = { git = "https://github.com/rok/arrow-rs.git", features = [
9999
"flight-sql-experimental",
100100
] }
101-
arrow-ipc = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false, features = [
101+
arrow-ipc = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing_2", default-features = false, features = [
102102
"lz4",
103103
] }
104-
arrow-ord = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false }
105-
arrow-schema = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false }
104+
arrow-ord = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing_2", default-features = false }
105+
arrow-schema = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing_2", default-features = false }
106106
async-trait = "0.1.88"
107107
bigdecimal = "0.4.8"
108108
bytes = "1.10"
@@ -156,7 +156,7 @@ itertools = "0.14"
156156
log = "^0.4"
157157
object_store = { version = "0.12.3", default-features = false }
158158
parking_lot = "0.12"
159-
parquet = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false, features = [
159+
parquet = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing_2", default-features = false, features = [
160160
"arrow",
161161
"async",
162162
"object_store",

datafusion-examples/Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,18 @@ rust-version = { workspace = true }
3232
[lints]
3333
workspace = true
3434

35+
[[example]]
36+
name = "flight_sql_server"
37+
path = "examples/flight/flight_sql_server.rs"
38+
39+
[[example]]
40+
name = "flight_server"
41+
path = "examples/flight/flight_server.rs"
42+
43+
[[example]]
44+
name = "flight_client"
45+
path = "examples/flight/flight_client.rs"
46+
3547
[[example]]
3648
name = "dataframe_to_s3"
3749
path = "examples/external_dependency/dataframe-to-s3.rs"

datafusion/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ log = { workspace = true }
7171
object_store = { workspace = true, optional = true }
7272
parquet = { workspace = true, optional = true, default-features = true }
7373
paste = "1.0.15"
74-
pyo3 = { version = "0.25.1", optional = true }
74+
pyo3 = { version = "0.25", optional = true }
7575
recursive = { workspace = true, optional = true }
7676
sqlparser = { workspace = true }
7777
tokio = { workspace = true }

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ use crate::{
2525
DataFusionError, Result, _internal_datafusion_err,
2626
};
2727

28-
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
29-
3028
use arrow::datatypes::Schema;
3129
// TODO: handle once deprecated
3230
use crate::encryption::add_crypto_to_writer_properties;
@@ -163,19 +161,6 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
163161
builder =
164162
builder.set_column_bloom_filter_ndv(path.clone(), bloom_filter_ndv);
165163
}
166-
<<<<<<< HEAD
167-
=======
168-
169-
// max_statistics_size is deprecated, currently it is not being used
170-
// TODO: remove once deprecated
171-
// #[allow(deprecated)]
172-
// if let Some(max_statistics_size) = options.max_statistics_size {
173-
// builder = {
174-
// #[allow(deprecated)]
175-
// builder.set_column_max_statistics_size(path, max_statistics_size)
176-
// }
177-
// }
178-
>>>>>>> f1f6d637c (Initial commit)
179164
}
180165

181166
Ok(builder)
@@ -224,10 +209,6 @@ impl ParquetOptions {
224209
dictionary_enabled,
225210
dictionary_page_size_limit,
226211
statistics_enabled,
227-
<<<<<<< HEAD
228-
=======
229-
max_statistics_size: _max_statistics_size,
230-
>>>>>>> f1f6d637c (Initial commit)
231212
max_row_group_size,
232213
created_by,
233214
column_index_truncate_length,
@@ -274,16 +255,6 @@ impl ParquetOptions {
274255
.set_data_page_row_count_limit(*data_page_row_count_limit)
275256
.set_bloom_filter_enabled(*bloom_filter_on_write);
276257

277-
<<<<<<< HEAD
278-
=======
279-
// builder = {
280-
// #[allow(deprecated)]
281-
// builder.set_max_statistics_size(
282-
// max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
283-
// )
284-
// };
285-
286-
>>>>>>> f1f6d637c (Initial commit)
287258
if let Some(bloom_filter_fpp) = bloom_filter_fpp {
288259
builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);
289260
};
@@ -562,10 +533,6 @@ mod tests {
562533
),
563534
bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
564535
bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
565-
<<<<<<< HEAD
566-
=======
567-
max_statistics_size: Some(DEFAULT_MAX_STATISTICS_SIZE),
568-
>>>>>>> f1f6d637c (Initial commit)
569536
}
570537
}
571538

datafusion/common/src/scalar/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2386,9 +2386,7 @@ impl ScalarValue {
23862386
| DataType::Time64(TimeUnit::Millisecond)
23872387
| DataType::RunEndEncoded(_, _)
23882388
| DataType::ListView(_)
2389-
| DataType::LargeListView(_)
2390-
| DataType::Decimal32(_, _)
2391-
| DataType::Decimal64(_, _) => {
2389+
| DataType::LargeListView(_) => {
23922390
return _not_impl_err!(
23932391
"Unsupported creation of {:?} array from ScalarValue {:?}",
23942392
data_type,

datafusion/core/src/dataframe/parquet.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ mod tests {
278278
// Write encrypted parquet using write_parquet
279279
let mut options = TableParquetOptions::default();
280280
options.crypto.file_encryption = Some((&encrypt).into());
281-
options.global.allow_single_file_parallelism = true;
282281

283282
df.write_parquet(
284283
tempfile_str.as_str(),

datafusion/core/tests/fuzz_cases/pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ async fn execute_with_predicate(
314314
}
315315

316316
async fn write_parquet_file(
317-
_truncation_length: Option<usize>,
317+
truncation_length: Option<usize>,
318318
schema: Arc<Schema>,
319319
row_groups: Vec<Vec<String>>,
320320
) -> Bytes {

datafusion/datasource-avro/src/avro_to_arrow/schema.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,6 @@ fn default_field_name(dt: &DataType) -> &str {
239239
DataType::Decimal64(_, _) => "decimal",
240240
DataType::Decimal128(_, _) => "decimal",
241241
DataType::Decimal256(_, _) => "decimal",
242-
DataType::Decimal32(_, _) => "decimal",
243-
DataType::Decimal64(_, _) => "decimal",
244242
}
245243
}
246244

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ use object_store::{ObjectMeta, ObjectStore};
7777
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
7878
use parquet::arrow::arrow_writer::{
7979
compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn,
80-
ArrowRowGroupWriterFactory, ArrowWriterOptions,
80+
ArrowWriterOptions,
8181
};
8282
use parquet::arrow::async_reader::MetadataFetch;
83-
use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter};
83+
use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, ArrowWriter, AsyncArrowWriter};
8484
use parquet::basic::Type;
8585

8686
use parquet::errors::ParquetError;
@@ -1463,13 +1463,10 @@ type ColSender = Sender<ArrowLeafColumn>;
14631463
/// Returns join handles for each columns serialization task along with a send channel
14641464
/// to send arrow arrays to each serialization task.
14651465
fn spawn_column_parallel_row_group_writer(
1466-
arrow_row_group_writer_factory: Arc<ArrowRowGroupWriterFactory>,
1466+
col_writers: Vec<ArrowColumnWriter>,
14671467
max_buffer_size: usize,
14681468
pool: &Arc<dyn MemoryPool>,
14691469
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1470-
let arrow_row_group_writer =
1471-
arrow_row_group_writer_factory.create_row_group_writer(0)?;
1472-
let col_writers = arrow_row_group_writer.into_column_writers();
14731470
let num_columns = col_writers.len();
14741471

14751472
let mut col_writer_tasks = Vec::with_capacity(num_columns);
@@ -1564,7 +1561,7 @@ fn spawn_rg_join_and_finalize_task(
15641561
/// across both columns and row_groups, with a theoretical max number of parallel tasks
15651562
/// given by n_columns * num_row_groups.
15661563
fn spawn_parquet_parallel_serialization_task(
1567-
arrow_row_group_writer_factory: Arc<ArrowRowGroupWriterFactory>,
1564+
arrow_writer: ArrowWriter<SerializedFileWriter<SharedBuffer>>,
15681565
mut data: Receiver<RecordBatch>,
15691566
serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
15701567
schema: Arc<Schema>,
@@ -1575,9 +1572,10 @@ fn spawn_parquet_parallel_serialization_task(
15751572
SpawnedTask::spawn(async move {
15761573
let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
15771574
let max_row_group_rows = writer_props.max_row_group_size();
1575+
let col_writers = arrow_writer.get_column_writers().unwrap();
15781576
let (mut column_writer_handles, mut col_array_channels) =
15791577
spawn_column_parallel_row_group_writer(
1580-
Arc::clone(&arrow_row_group_writer_factory),
1578+
col_writers,
15811579
max_buffer_rb,
15821580
&pool,
15831581
)?;
@@ -1631,7 +1629,7 @@ fn spawn_parquet_parallel_serialization_task(
16311629

16321630
(column_writer_handles, col_array_channels) =
16331631
spawn_column_parallel_row_group_writer(
1634-
Arc::clone(&arrow_row_group_writer_factory),
1632+
col_writers,
16351633
max_buffer_rb,
16361634
&pool,
16371635
)?;
@@ -1730,16 +1728,12 @@ async fn output_single_parquet_file_parallelized(
17301728
parquet_schema.root_schema_ptr(),
17311729
parquet_props.clone().into(),
17321730
)?;
1733-
let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(
1734-
&parquet_writer,
1735-
parquet_schema,
1736-
Arc::clone(&output_schema),
1737-
parquet_props.clone().into(),
1738-
);
1731+
let writer = ArrowWriter::try_new(
1732+
parquet_writer, Arc::clone(&output_schema), Some(parquet_props.clone()))?;
17391733

17401734
let arc_props = Arc::new(parquet_props.clone());
17411735
let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1742-
Arc::new(arrow_row_group_writer_factory),
1736+
writer,
17431737
data,
17441738
serialize_tx,
17451739
Arc::clone(&output_schema),

0 commit comments

Comments
 (0)