Skip to content

Commit a1d15cc

Browse files
authored
feat: support multi-threaded writing of Parquet files with modular encryption (#16738)
* Initial commit diff --git c/Cargo.lock i/Cargo.lock index 749971532..f0b9d0a5f 100644 --- c/Cargo.lock +++ i/Cargo.lock @@ -246,52 +246,62 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd798aea3553913a5986813e9c6ad31a2d2b04e931fe8ea4a37155eb541cebb5" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-arith", - "arrow-array", - "arrow-buffer", - "arrow-cast", + "arrow-arith 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "arrow-csv", - "arrow-data", - "arrow-ipc", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-ipc 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "arrow-json", - "arrow-ord", + "arrow-ord 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "arrow-pyarrow", - "arrow-row", - "arrow-schema", - "arrow-select", - "arrow-string", + "arrow-row 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-string 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "half", "rand 0.9.2", ] [[package]] name = "arrow-arith" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "508dafb53e5804a238cab7fd97a59ddcbfab20cc4d9814b1ab5465b9fa147f2e" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "chrono", + "num", +] + +[[package]] +name = "arrow-arith" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +dependencies = [ + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", "chrono", "num", ] [[package]] name = "arrow-array" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2730bc045d62bb2e53ef8395b7d4242f5c8102f41ceac15e8395b9ac3d08461" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ "ahash 0.8.12", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "chrono", "chrono-tz", "half", @@ -299,11 +309,35 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-array" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +dependencies = [ + "ahash 0.8.12", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "chrono", + "half", + "hashbrown 0.15.4", + "num", +] + [[package]] name = "arrow-buffer" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54295b93beb702ee9a6f6fbced08ad7f4d76ec1c297952d4b83cf68755421d1d" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +dependencies = [ + "bytes", + "half", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" dependencies = [ "bytes", "half", @@ -312,15 +346,14 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67e8bcb7dc971d779a7280593a1bf0c2743533b8028909073e804552e85e75b5" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "atoi", "base64 0.22.1", "chrono", @@ -332,14 +365,32 @@ dependencies = [ ] [[package]] -name = "arrow-csv" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "673fd2b5fb57a1754fdbfac425efd7cf54c947ac9950c1cce86b14e248f1c458" +name = "arrow-cast" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" dependencies = [ - "arrow-array", - "arrow-cast", - "arrow-schema", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "atoi", + "base64 0.22.1", + "chrono", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-csv" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +dependencies = [ + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "chrono", "csv", "csv-core", @@ -348,33 +399,42 @@ dependencies = [ [[package]] name = "arrow-data" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97c22fe3da840039c69e9f61f81e78092ea36d57037b4900151f063615a2f6b4" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-buffer", - "arrow-schema", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "half", + "num", +] + +[[package]] +name = "arrow-data" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +dependencies = [ + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", "half", "num", ] [[package]] name = "arrow-flight" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6808d235786b721e49e228c44dd94242f2e8b46b7e95b233b0733c46e758bfee" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" dependencies = [ - "arrow-arith", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-ipc", - "arrow-ord", - "arrow-row", - "arrow-schema", - "arrow-select", - "arrow-string", + "arrow-arith 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-ipc 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-ord 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-row 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-string 55.2.0 (git+https://github.com/rok/arrow-rs.git)", "base64 0.22.1", "bytes", "futures", @@ -382,35 +442,45 @@ dependencies = [ "paste", "prost", "prost-types", - "tonic", + "tonic 0.12.3", ] [[package]] name = "arrow-ipc" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "778de14c5a69aedb27359e3dd06dd5f9c481d5f6ee9fbae912dba332fd64636b" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "flatbuffers", "lz4_flex", "zstd", ] [[package]] -name = "arrow-json" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3860db334fe7b19fcf81f6b56f8d9d95053f3839ffe443d56b5436f7a29a1794" +name = "arrow-ipc" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "flatbuffers", +] + +[[package]] +name = "arrow-json" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +dependencies = [ + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "chrono", "half", "indexmap 2.10.0", @@ -424,78 +494,130 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "425fa0b42a39d3ff55160832e7c25553e7f012c3f187def3d70313e7a29ba5d9" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", +] + +[[package]] +name = "arrow-ord" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +dependencies = [ + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git)", ] [[package]] name = "arrow-pyarrow" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d944d8ae9b77230124e6570865b570416c33a5809f32c4136c679bbe774e45c9" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-array", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "pyo3", ] [[package]] name = "arrow-row" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df9c9423c9e71abd1b08a7f788fcd203ba2698ac8e72a1f236f1faa1a06a7414" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "half", +] + +[[package]] +name = "arrow-row" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +dependencies = [ + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", "half", ] [[package]] name = "arrow-schema" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fa1babc4a45fdc64a92175ef51ff00eba5ebbc0007962fecf8022ac1c6ce28" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ "bitflags 2.9.1", "serde", "serde_json", ] +[[package]] +name = "arrow-schema" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" + [[package]] name = "arrow-select" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8854d15f1cf5005b4b358abeb60adea17091ff5bdd094dca5d3f73787d81170" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ "ahash 0.8.12", - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "num", +] + +[[package]] +name = "arrow-select" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +dependencies = [ + "ahash 0.8.12", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", "num", ] [[package]] name = "arrow-string" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c477e8b89e1213d5927a2a84a72c384a9bf4dd0dbf15f9fd66d821aafd9e95e" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "memchr", + "num", + "regex", + "regex-syntax", +] + +[[package]] +name = "arrow-string" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +dependencies = [ + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git)", "memchr", "num", "regex", @@ -567,6 +689,28 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -827,7 +971,7 @@ dependencies = [ "rustls-native-certs", "rustls-pki-types", "tokio", - "tower", + "tower 0.5.2", "tracing", ] @@ -948,18 +1092,19 @@ dependencies = [ [[package]] name = "axum" -version = "0.8.4" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ - "axum-core", + "async-trait", + "axum-core 0.4.5", "bytes", "futures-util", "http 1.3.1", "http-body 1.0.1", "http-body-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -967,7 +1112,53 @@ dependencies = [ "rustversion", "serde", "sync_wrapper", - "tower", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" +dependencies = [ + "axum-core 0.5.2", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit 0.8.4", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", "tower-layer", "tower-service", ] @@ -1818,8 +2009,8 @@ name = "datafusion" version = "49.0.1" dependencies = [ "arrow", - "arrow-ipc", - "arrow-schema", + "arrow-ipc 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "async-trait", "bytes", "bzip2 0.6.0", @@ -1996,7 +2187,7 @@ dependencies = [ "ahash 0.8.12", "apache-avro", "arrow", - "arrow-ipc", + "arrow-ipc 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "base64 0.22.1", "chrono", "half", @@ -2176,7 +2367,7 @@ version = "49.0.1" dependencies = [ "arrow", "arrow-flight", - "arrow-schema", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "async-trait", "base64 0.22.1", "bytes", @@ -2197,7 +2388,7 @@ dependencies = [ "tempfile", "test-utils", "tokio", - "tonic", + "tonic 0.13.1", "tracing", "tracing-subscriber", "url", @@ -2264,7 +2455,7 @@ version = "49.0.1" dependencies = [ "abi_stable", "arrow", - "arrow-schema", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "async-ffi", "async-trait", "datafusion", @@ -2284,7 +2475,7 @@ name = "datafusion-functions" version = "49.0.1" dependencies = [ "arrow", - "arrow-buffer", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "base64 0.22.1", "blake2", "blake3", @@ -2347,7 +2538,7 @@ name = "datafusion-functions-nested" version = "49.0.1" dependencies = [ "arrow", - "arrow-ord", + "arrow-ord 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "criterion", "datafusion-common", "datafusion-doc", @@ -2517,8 +2708,8 @@ version = "49.0.1" dependencies = [ "ahash 0.8.12", "arrow", - "arrow-ord", - "arrow-schema", + "arrow-ord 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "async-trait", "chrono", "criterion", @@ -2589,7 +2780,7 @@ name = "datafusion-pruning" version = "49.0.1" dependencies = [ "arrow", - "arrow-schema", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "datafusion-common", "datafusion-datasource", "datafusion-expr", @@ -4157,6 +4348,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matchit" version = "0.8.4" @@ -4529,18 +4726,17 @@ dependencies = [ [[package]] name = "parquet" -version = "56.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7288a07ed5d25939a90f9cb1ca5afa6855faa08ec7700613511ae64bdb0620c" +version = "55.2.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" dependencies = [ "ahash 0.8.12", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-ipc", - "arrow-schema", - "arrow-select", + "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-ipc 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", "base64 0.22.1", "brotli", "bytes", @@ -5449,7 +5645,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-util", - "tower", + "tower 0.5.2", "tower-http", "tower-service", "url", @@ -6681,12 +6877,13 @@ dependencies = [ [[package]] name = "tonic" -version = "0.13.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ + "async-stream", "async-trait", - "axum", + "axum 0.7.9", "base64 0.22.1", "bytes", "h2", @@ -6702,7 +6899,56 @@ dependencies = [ "socket2 0.5.10", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +dependencies = [ + "async-trait", + "axum 0.8.4", + "base64 0.22.1", + "bytes", + "h2", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2 0.5.10", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -6740,7 +6986,7 @@ dependencies = [ "http-body 1.0.1", "iri-string", "pin-project-lite", - "tower", + "tower 0.5.2", "tower-layer", "tower-service", ] diff --git c/Cargo.toml i/Cargo.toml index 5915035cd..5ee3cc566 100644 --- c/Cargo.toml +++ i/Cargo.toml @@ -90,19 +90,20 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.17", default-features = false } -arrow = { version = "56.0.0", features = [ +arrow = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "56.0.0", default-features = false } -arrow-flight = { version = "56.0.0", features = [ + +arrow-buffer = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false } +arrow-flight = { git = "https://github.com/rok/arrow-rs.git", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "56.0.0", default-features = false, features = [ +arrow-ipc = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "56.0.0", default-features = false } -arrow-schema = { version = "56.0.0", default-features = false } +arrow-ord = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false } +arrow-schema = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false } async-trait = "0.1.89" bigdecimal = "0.4.8" bytes = "1.10" @@ -157,7 +158,7 @@ itertools = "0.14" log = "^0.4" object_store = { version = "0.12.3", default-features = false } parking_lot = "0.12" -parquet = { version = "56.0.0", default-features = false, features = [ +parquet = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false, features = [ "arrow", "async", "object_store", diff --git c/datafusion-examples/Cargo.toml i/datafusion-examples/Cargo.toml index f12bd9202..b4c8d3507 100644 --- c/datafusion-examples/Cargo.toml +++ i/datafusion-examples/Cargo.toml @@ -32,18 +32,6 @@ rust-version = { workspace = true } [lints] workspace = true -[[example]] -name = "flight_sql_server" -path = "examples/flight/flight_sql_server.rs" - -[[example]] -name = "flight_server" -path = "examples/flight/flight_server.rs" - -[[example]] -name = "flight_client" -path = "examples/flight/flight_client.rs" - [[example]] name = "dataframe_to_s3" path = "examples/external_dependency/dataframe-to-s3.rs" diff --git c/datafusion/common/Cargo.toml i/datafusion/common/Cargo.toml index afd74c7be..8040b3ad1 100644 --- c/datafusion/common/Cargo.toml +++ i/datafusion/common/Cargo.toml @@ -71,7 +71,7 @@ log = { workspace = true } object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } paste = "1.0.15" -pyo3 = { version = "0.25", optional = true } +pyo3 = { version = "0.25.1", optional = true } recursive = { workspace = true, optional = true } sqlparser = { workspace = true } tokio = { workspace = true } diff --git c/datafusion/common/src/file_options/parquet_writer.rs i/datafusion/common/src/file_options/parquet_writer.rs index 185826aef..d7b490af0 100644 --- c/datafusion/common/src/file_options/parquet_writer.rs +++ i/datafusion/common/src/file_options/parquet_writer.rs @@ -25,6 +25,8 @@ use crate::{ DataFusionError, Result, _internal_datafusion_err, }; +pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; + use arrow::datatypes::Schema; // TODO: handle once deprecated #[allow(deprecated)] diff --git c/datafusion/common/src/scalar/mod.rs i/datafusion/common/src/scalar/mod.rs index 51247612e..8f8c52086 100644 --- c/datafusion/common/src/scalar/mod.rs +++ i/datafusion/common/src/scalar/mod.rs @@ -2386,7 +2386,9 @@ impl ScalarValue { | DataType::Time64(TimeUnit::Millisecond) | DataType::RunEndEncoded(_, _) | DataType::ListView(_) - | DataType::LargeListView(_) => { + | DataType::LargeListView(_) + | DataType::Decimal32(_, _) + | DataType::Decimal64(_, _) => { return _not_impl_err!( "Unsupported creation of {:?} array from ScalarValue {:?}", data_type, diff --git c/datafusion/core/src/dataframe/parquet.rs i/datafusion/core/src/dataframe/parquet.rs index 83bb60184..01149c1ec 100644 --- c/datafusion/core/src/dataframe/parquet.rs +++ i/datafusion/core/src/dataframe/parquet.rs @@ -278,6 +278,7 @@ mod tests { // Write encrypted parquet using write_parquet let mut options = TableParquetOptions::default(); options.crypto.file_encryption = Some((&encrypt).into()); + options.global.allow_single_file_parallelism = true; df.write_parquet( tempfile_str.as_str(), diff --git c/datafusion/core/tests/fuzz_cases/pruning.rs i/datafusion/core/tests/fuzz_cases/pruning.rs index c6e30c072..4ab1f08f1 100644 --- c/datafusion/core/tests/fuzz_cases/pruning.rs +++ i/datafusion/core/tests/fuzz_cases/pruning.rs @@ -314,7 +314,7 @@ async fn execute_with_predicate( } async fn write_parquet_file( - truncation_length: Option<usize>, + _truncation_length: Option<usize>, schema: Arc<Schema>, row_groups: Vec<Vec<String>>, ) -> Bytes { diff --git c/datafusion/datasource-avro/src/avro_to_arrow/schema.rs i/datafusion/datasource-avro/src/avro_to_arrow/schema.rs index cc87d3c1c..00b3f9d6d 100644 --- c/datafusion/datasource-avro/src/avro_to_arrow/schema.rs +++ i/datafusion/datasource-avro/src/avro_to_arrow/schema.rs @@ -239,6 +239,8 @@ fn default_field_name(dt: &DataType) -> &str { DataType::Decimal64(_, _) => "decimal", DataType::Decimal128(_, _) => "decimal", DataType::Decimal256(_, _) => "decimal", + DataType::Decimal32(_, _) => "decimal", + DataType::Decimal64(_, _) => "decimal", } } diff --git c/datafusion/datasource-parquet/src/file_format.rs i/datafusion/datasource-parquet/src/file_format.rs index 56718534a..934a7b2ee 100644 --- c/datafusion/datasource-parquet/src/file_format.rs +++ i/datafusion/datasource-parquet/src/file_format.rs @@ -78,8 +78,8 @@ use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_writer::{ - compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, - ArrowLeafColumn, ArrowWriterOptions, + compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, + ArrowRowGroupWriterFactory, ArrowWriterOptions, }; use parquet::arrow::async_reader::MetadataFetch; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; @@ -1570,7 +1570,7 @@ impl FileSink for ParquetSink { while let Some((path, mut rx)) = file_stream_rx.recv().await { let parquet_props = self.create_writer_props(&runtime, &path)?; - if !allow_single_file_parallelism { + if !parquet_opts.global.allow_single_file_parallelism { let mut writer = self .create_async_arrow_writer( &path, @@ -1698,13 +1698,13 @@ type ColSender = Sender<ArrowLeafColumn>; /// Returns join handles for each columns serialization task along with a send channel /// to send arrow arrays to each serialization task. fn spawn_column_parallel_row_group_writer( - schema: Arc<Schema>, - parquet_props: Arc<WriterProperties>, + arrow_row_group_writer_factory: Arc<ArrowRowGroupWriterFactory>, max_buffer_size: usize, pool: &Arc<dyn MemoryPool>, ) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> { - let schema_desc = ArrowSchemaConverter::new().convert(&schema)?; - let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?; + let arrow_row_group_writer = + arrow_row_group_writer_factory.create_row_group_writer(0)?; + let col_writers = arrow_row_group_writer.into_column_writers(); let num_columns = col_writers.len(); let mut col_writer_tasks = Vec::with_capacity(num_columns); @@ -1799,6 +1799,7 @@ fn spawn_rg_join_and_finalize_task( /// across both columns and row_groups, with a theoretical max number of parallel tasks /// given by n_columns * num_row_groups. fn spawn_parquet_parallel_serialization_task( + arrow_row_group_writer_factory: Arc<ArrowRowGroupWriterFactory>, mut data: Receiver<RecordBatch>, serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>, schema: Arc<Schema>, @@ -1811,12 +1812,14 @@ fn spawn_parquet_parallel_serialization_task( let max_row_group_rows = writer_props.max_row_group_size(); let (mut column_writer_handles, mut col_array_channels) = spawn_column_parallel_row_group_writer( - Arc::clone(&schema), - Arc::clone(&writer_props), + Arc::clone(&arrow_row_group_writer_factory), max_buffer_rb, &pool, )?; let mut current_rg_rows = 0; + // TODO: row_group_writer should use the correct row group index. Currently this would fail if + // multiple row groups were written. + // let mut rg_index = 0; while let Some(mut rb) = data.recv().await { // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case @@ -1863,8 +1866,7 @@ fn spawn_parquet_parallel_serialization_task( (column_writer_handles, col_array_channels) = spawn_column_parallel_row_group_writer( - Arc::clone(&schema), - Arc::clone(&writer_props), + Arc::clone(&arrow_row_group_writer_factory), max_buffer_rb, &pool, )?; @@ -1895,24 +1897,15 @@ fn spawn_parquet_parallel_serialization_task( /// Consume RowGroups serialized by other parallel tasks and concatenate them in /// to the final parquet file, while flushing finalized bytes to an [ObjectStore] async fn concatenate_parallel_row_groups( + mut parquet_writer: SerializedFileWriter<SharedBuffer>, + merged_buff: SharedBuffer, mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>, - schema: Arc<Schema>, - writer_props: Arc<WriterProperties>, mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>, pool: Arc<dyn MemoryPool>, ) -> Result<FileMetaData> { - let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); - let mut file_reservation = MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool); - let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?; - let mut parquet_writer = SerializedFileWriter::new( - merged_buff.clone(), - schema_desc.root_schema_ptr(), - writer_props, - )?; - while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; let mut rg_out = parquet_writer.next_row_group()?; @@ -1963,8 +1956,25 @@ async fn output_single_parquet_file_parallelized( let (serialize_tx, serialize_rx) = mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups); + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(parquet_props.coerce_types()) + .convert(&output_schema)?; + let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); + let parquet_writer = SerializedFileWriter::new( + merged_buff.clone(), + parquet_schema.root_schema_ptr(), + parquet_props.clone().into(), + )?; + let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new( + &parquet_writer, + parquet_schema, + Arc::clone(&output_schema), + parquet_props.clone().into(), + ); + let arc_props = Arc::new(parquet_props.clone()); let launch_serialization_task = spawn_parquet_parallel_serialization_task( + Arc::new(arrow_row_group_writer_factory), data, serialize_tx, Arc::clone(&output_schema), @@ -1972,19 +1982,21 @@ async fn output_single_parquet_file_parallelized( parallel_options, Arc::clone(&pool), ); - let file_metadata = concatenate_parallel_row_groups( - serialize_rx, - Arc::clone(&output_schema), - Arc::clone(&arc_props), - object_store_writer, - pool, - ) - .await?; launch_serialization_task .join_unwind() .await .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; + + let file_metadata = concatenate_parallel_row_groups( + parquet_writer, + merged_buff, + serialize_rx, + object_store_writer, + pool, + ) + .await?; + Ok(file_metadata) } diff --git c/datafusion/expr/src/utils.rs i/datafusion/expr/src/utils.rs index 7a612b6fe..cd8e419ac 100644 --- c/datafusion/expr/src/utils.rs +++ i/datafusion/expr/src/utils.rs @@ -818,6 +818,8 @@ pub fn can_hash(data_type: &DataType) -> bool { DataType::Decimal64(_, _) => true, DataType::Decimal128(_, _) => true, DataType::Decimal256(_, _) => true, + DataType::Decimal32(_, _) => true, + DataType::Decimal64(_, _) => true, DataType::Timestamp(_, _) => true, DataType::Utf8 => true, DataType::LargeUtf8 => true, diff --git c/datafusion/sql/src/unparser/expr.rs i/datafusion/sql/src/unparser/expr.rs index 0501a4e04..86c648cba 100644 --- c/datafusion/sql/src/unparser/expr.rs +++ i/datafusion/sql/src/unparser/expr.rs @@ -1729,7 +1729,9 @@ impl Unparser<'_> { not_impl_err!("Unsupported DataType: conversion: {data_type:?}") } DataType::Decimal128(precision, scale) - | DataType::Decimal256(precision, scale) => { + | DataType::Decimal256(precision, scale) + | DataType::Decimal32(precision, scale) + | DataType::Decimal64(precision, scale) => { let mut new_precision = *precision as u64; let mut new_scale = *scale as u64; if *scale < 0 { diff --git c/datafusion/sqllogictest/test_files/copy.slt i/datafusion/sqllogictest/test_files/copy.slt index 096cde86f..e16fcfe84 100644 --- c/datafusion/sqllogictest/test_files/copy.slt +++ i/datafusion/sqllogictest/test_files/copy.slt @@ -306,7 +306,7 @@ select * from validate_struct_with_array; # Copy parquet with all supported statement overrides -query I +query error DataFusion error: Invalid or Unsupported Configuration: Config value "max_statistics_size" not found on ParquetOptions COPY source_table TO 'test_files/scratch/copy/table_with_options/' STORED AS PARQUET @@ -336,8 +336,6 @@ OPTIONS ( 'format.bloom_filter_ndv' 100, 'format.metadata::key' 'value' ) ----- -2 # valid vs invalid metadata @@ -404,11 +402,8 @@ OPTIONS ( statement ok CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 'test_files/scratch/copy/table_with_options/'; -query IT +statement count 0 select * from validate_parquet_with_options; ----- -1 Foo -2 Bar # Copy from table to single file query I diff --git c/datafusion/substrait/src/logical_plan/consumer/utils.rs i/datafusion/substrait/src/logical_plan/consumer/utils.rs index f7eedcb7a..f809bc82a 100644 --- c/datafusion/substrait/src/logical_plan/consumer/utils.rs +++ i/datafusion/substrait/src/logical_plan/consumer/utils.rs @@ -216,7 +216,9 @@ pub fn rename_data_type( | DataType::Decimal32(_, _) | DataType::Decimal64(_, _) | DataType::Decimal128(_, _) - | DataType::Decimal256(_, _) => Ok(data_type.clone()), + | DataType::Decimal256(_, _) + | DataType::Decimal32(_, _) + | DataType::Decimal64(_, _) => Ok(data_type.clone()), } } * Rebase fixes, switch to ArrowWriter approach diff --git c/Cargo.lock i/Cargo.lock index f0b9d0a5f..373239aab 100644 --- c/Cargo.lock +++ i/Cargo.lock @@ -246,62 +246,62 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-arith 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-arith 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-cast 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "arrow-csv", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-ipc 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-ipc 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "arrow-json", - "arrow-ord 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-ord 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "arrow-pyarrow", - "arrow-row 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-string 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-row 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-select 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-string 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "half", "rand 0.9.2", ] [[package]] name = "arrow-arith" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "chrono", "num", ] [[package]] name = "arrow-arith" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git#876585c1cd986dbaee0c26d52b55a4186a2f68c8" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git)", "chrono", "num", ] [[package]] name = "arrow-array" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ "ahash 0.8.12", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "chrono", "chrono-tz", "half", @@ -311,13 +311,13 @@ dependencies = [ [[package]] name = "arrow-array" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git#876585c1cd986dbaee0c26d52b55a4186a2f68c8" dependencies = [ "ahash 0.8.12", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git)", "chrono", "half", "hashbrown 0.15.4", @@ -326,8 +326,8 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ "bytes", "half", @@ -336,8 +336,8 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git#876585c1cd986dbaee0c26d52b55a4186a2f68c8" dependencies = [ "bytes", "half", @@ -346,14 +346,14 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-select 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "atoi", "base64 0.22.1", "chrono", @@ -366,14 +366,14 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git#876585c1cd986dbaee0c26d52b55a4186a2f68c8" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-select 56.0.0 (git+https://github.com/rok/arrow-rs.git)", "atoi", "base64 0.22.1", "chrono", @@ -385,12 +385,12 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-cast 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "chrono", "csv", "csv-core", @@ -399,42 +399,42 @@ dependencies = [ [[package]] name = "arrow-data" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "half", "num", ] [[package]] name = "arrow-data" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git#876585c1cd986dbaee0c26d52b55a4186a2f68c8" dependencies = [ - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git)", "half", "num", ] [[package]] name = "arrow-flight" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git#876585c1cd986dbaee0c26d52b55a4186a2f68c8" dependencies = [ - "arrow-arith 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-ipc 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-ord 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-row 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-string 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-arith 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-cast 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-ipc 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-ord 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-row 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-select 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-string 56.0.0 (git+https://github.com/rok/arrow-rs.git)", "base64 0.22.1", "bytes", "futures", @@ -442,18 +442,18 @@ dependencies = [ "paste", "prost", "prost-types", - "tonic 0.12.3", + "tonic", ] [[package]] name = "arrow-ipc" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "flatbuffers", "lz4_flex", "zstd", @@ -461,26 +461,26 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git#876585c1cd986dbaee0c26d52b55a4186a2f68c8" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git)", "flatbuffers", ] [[package]] name = "arrow-json" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-cast 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-cast 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", "chrono", "half", "indexmap 2.10.0", @@ -494,67 +494,67 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", + "arrow-select 56.0.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2)", ] [[package]] name = "arrow-ord" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git#674dc17b2c423be16d0725a6537b0063ac7b1b58" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git#876585c1cd986dbaee0c26d52b55a4186a2f68c8" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-buffer 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-schema 55.2.0 (git+https://github.com/rok/arrow-rs.git)", - "arrow-select 55.2.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-array 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-buffer 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-data 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-schema 56.0.0 (git+https://github.com/rok/arrow-rs.git)", + "arrow-select 56.0.0 (git+https://github.com/rok/arrow-rs.git)", ] [[package]] name = "arrow-pyarrow" -version = "55.2.0" -source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing#b9396ccee27a39c91feccc982f5e976f0c0ff6d8" +version = "56.0.0" +source = "git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing_2#d9590212db94de291203220e2ed0beb808c69072" dependencies = [ - "arrow-array 55.2.0 (git+https://github.com/rok/arrow-rs.git?branch=multi-threaded_encrypted_writing)", - "arrow-data 55.2.0 (git+https://github.com/rok/arrow-rs.git?bra…
1 parent 948f6b8 commit a1d15cc

File tree

3 files changed

+43
-49
lines changed

3 files changed

+43
-49
lines changed

datafusion/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ parking_lot = { workspace = true }
149149
parquet = { workspace = true, optional = true, default-features = true }
150150
rand = { workspace = true }
151151
regex = { workspace = true }
152+
rstest = { workspace = true }
152153
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
153154
sqlparser = { workspace = true, optional = true }
154155
tempfile = { workspace = true }

datafusion/core/src/dataframe/parquet.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ impl DataFrame {
102102

103103
#[cfg(test)]
104104
mod tests {
105+
use rstest::rstest;
105106
use std::collections::HashMap;
106107
use std::sync::Arc;
107108

@@ -247,9 +248,12 @@ mod tests {
247248
Ok(())
248249
}
249250

251+
#[rstest]
250252
#[cfg(feature = "parquet_encryption")]
251253
#[tokio::test]
252-
async fn roundtrip_parquet_with_encryption() -> Result<()> {
254+
async fn roundtrip_parquet_with_encryption(
255+
#[values(false, true)] allow_single_file_parallelism: bool,
256+
) -> Result<()> {
253257
use parquet::encryption::decrypt::FileDecryptionProperties;
254258
use parquet::encryption::encrypt::FileEncryptionProperties;
255259

@@ -278,6 +282,7 @@ mod tests {
278282
// Write encrypted parquet using write_parquet
279283
let mut options = TableParquetOptions::default();
280284
options.crypto.file_encryption = Some((&encrypt).into());
285+
options.global.allow_single_file_parallelism = allow_single_file_parallelism;
281286

282287
df.write_parquet(
283288
tempfile_str.as_str(),

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@ use object_store::buffered::BufWriter;
7171
use object_store::path::Path;
7272
use object_store::{ObjectMeta, ObjectStore};
7373
use parquet::arrow::arrow_writer::{
74-
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
75-
ArrowLeafColumn, ArrowWriterOptions,
74+
compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn,
75+
ArrowRowGroupWriterFactory, ArrowWriterOptions,
7676
};
7777
use parquet::arrow::async_reader::MetadataFetch;
78-
use parquet::arrow::{ArrowSchemaConverter, AsyncArrowWriter};
78+
use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
7979
use parquet::basic::Type;
8080

8181
use crate::metadata::DFParquetMetadata;
@@ -1128,14 +1128,7 @@ impl ParquetSink {
11281128
runtime: &Arc<RuntimeEnv>,
11291129
path: &Path,
11301130
) -> Result<WriterProperties> {
1131-
let schema = if self.parquet_options.global.allow_single_file_parallelism {
1132-
// If parallelizing writes, we may be also be doing hive style partitioning
1133-
// into multiple files which impacts the schema per file.
1134-
// Refer to `get_writer_schema()`
1135-
&get_writer_schema(&self.config)
1136-
} else {
1137-
self.config.output_schema()
1138-
};
1131+
let schema = self.config.output_schema();
11391132

11401133
// TODO: avoid this clone in follow up PR, where the writer properties & schema
11411134
// are calculated once on `ParquetSink::new`
@@ -1249,16 +1242,6 @@ impl FileSink for ParquetSink {
12491242
object_store: Arc<dyn ObjectStore>,
12501243
) -> Result<u64> {
12511244
let parquet_opts = &self.parquet_options;
1252-
let mut allow_single_file_parallelism =
1253-
parquet_opts.global.allow_single_file_parallelism;
1254-
1255-
if parquet_opts.crypto.file_encryption.is_some()
1256-
|| parquet_opts.crypto.factory_id.is_some()
1257-
{
1258-
// For now, arrow-rs does not support parallel writes with encryption
1259-
// See https://github.com/apache/arrow-rs/issues/7359
1260-
allow_single_file_parallelism = false;
1261-
}
12621245

12631246
let mut file_write_tasks: JoinSet<
12641247
std::result::Result<(Path, FileMetaData), DataFusionError>,
@@ -1276,7 +1259,7 @@ impl FileSink for ParquetSink {
12761259

12771260
while let Some((path, mut rx)) = file_stream_rx.recv().await {
12781261
let parquet_props = self.create_writer_props(&runtime, &path).await?;
1279-
if !allow_single_file_parallelism {
1262+
if !parquet_opts.global.allow_single_file_parallelism {
12801263
let mut writer = self
12811264
.create_async_arrow_writer(
12821265
&path,
@@ -1316,6 +1299,7 @@ impl FileSink for ParquetSink {
13161299
.build()?;
13171300
let schema = get_writer_schema(&self.config);
13181301
let props = parquet_props.clone();
1302+
let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
13191303
let parallel_options_clone = parallel_options.clone();
13201304
let pool = Arc::clone(context.memory_pool());
13211305
file_write_tasks.spawn(async move {
@@ -1324,6 +1308,7 @@ impl FileSink for ParquetSink {
13241308
rx,
13251309
schema,
13261310
&props,
1311+
skip_arrow_metadata,
13271312
parallel_options_clone,
13281313
pool,
13291314
)
@@ -1404,13 +1389,10 @@ type ColSender = Sender<ArrowLeafColumn>;
14041389
/// Returns join handles for each columns serialization task along with a send channel
14051390
/// to send arrow arrays to each serialization task.
14061391
fn spawn_column_parallel_row_group_writer(
1407-
schema: Arc<Schema>,
1408-
parquet_props: Arc<WriterProperties>,
1392+
col_writers: Vec<ArrowColumnWriter>,
14091393
max_buffer_size: usize,
14101394
pool: &Arc<dyn MemoryPool>,
14111395
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1412-
let schema_desc = ArrowSchemaConverter::new().convert(&schema)?;
1413-
let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
14141396
let num_columns = col_writers.len();
14151397

14161398
let mut col_writer_tasks = Vec::with_capacity(num_columns);
@@ -1505,6 +1487,7 @@ fn spawn_rg_join_and_finalize_task(
15051487
/// across both columns and row_groups, with a theoretical max number of parallel tasks
15061488
/// given by n_columns * num_row_groups.
15071489
fn spawn_parquet_parallel_serialization_task(
1490+
row_group_writer_factory: ArrowRowGroupWriterFactory,
15081491
mut data: Receiver<RecordBatch>,
15091492
serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
15101493
schema: Arc<Schema>,
@@ -1515,13 +1498,11 @@ fn spawn_parquet_parallel_serialization_task(
15151498
SpawnedTask::spawn(async move {
15161499
let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
15171500
let max_row_group_rows = writer_props.max_row_group_size();
1501+
let mut row_group_index = 0;
1502+
let col_writers =
1503+
row_group_writer_factory.create_column_writers(row_group_index)?;
15181504
let (mut column_writer_handles, mut col_array_channels) =
1519-
spawn_column_parallel_row_group_writer(
1520-
Arc::clone(&schema),
1521-
Arc::clone(&writer_props),
1522-
max_buffer_rb,
1523-
&pool,
1524-
)?;
1505+
spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
15251506
let mut current_rg_rows = 0;
15261507

15271508
while let Some(mut rb) = data.recv().await {
@@ -1567,10 +1548,12 @@ fn spawn_parquet_parallel_serialization_task(
15671548
current_rg_rows = 0;
15681549
rb = rb.slice(rows_left, rb.num_rows() - rows_left);
15691550

1551+
row_group_index += 1;
1552+
let col_writers = row_group_writer_factory
1553+
.create_column_writers(row_group_index)?;
15701554
(column_writer_handles, col_array_channels) =
15711555
spawn_column_parallel_row_group_writer(
1572-
Arc::clone(&schema),
1573-
Arc::clone(&writer_props),
1556+
col_writers,
15741557
max_buffer_rb,
15751558
&pool,
15761559
)?;
@@ -1601,29 +1584,21 @@ fn spawn_parquet_parallel_serialization_task(
16011584
/// Consume RowGroups serialized by other parallel tasks and concatenate them in
16021585
/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
16031586
async fn concatenate_parallel_row_groups(
1587+
mut parquet_writer: SerializedFileWriter<SharedBuffer>,
1588+
merged_buff: SharedBuffer,
16041589
mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1605-
schema: Arc<Schema>,
1606-
writer_props: Arc<WriterProperties>,
16071590
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
16081591
pool: Arc<dyn MemoryPool>,
16091592
) -> Result<FileMetaData> {
1610-
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1611-
16121593
let mut file_reservation =
16131594
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
16141595

1615-
let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?;
1616-
let mut parquet_writer = SerializedFileWriter::new(
1617-
merged_buff.clone(),
1618-
schema_desc.root_schema_ptr(),
1619-
writer_props,
1620-
)?;
1621-
16221596
while let Some(task) = serialize_rx.recv().await {
16231597
let result = task.join_unwind().await;
1624-
let mut rg_out = parquet_writer.next_row_group()?;
16251598
let (serialized_columns, mut rg_reservation, _cnt) =
16261599
result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1600+
1601+
let mut rg_out = parquet_writer.next_row_group()?;
16271602
for chunk in serialized_columns {
16281603
chunk.append_to_row_group(&mut rg_out)?;
16291604
rg_reservation.free();
@@ -1661,6 +1636,7 @@ async fn output_single_parquet_file_parallelized(
16611636
data: Receiver<RecordBatch>,
16621637
output_schema: Arc<Schema>,
16631638
parquet_props: &WriterProperties,
1639+
skip_arrow_metadata: bool,
16641640
parallel_options: ParallelParquetWriterOptions,
16651641
pool: Arc<dyn MemoryPool>,
16661642
) -> Result<FileMetaData> {
@@ -1670,7 +1646,19 @@ async fn output_single_parquet_file_parallelized(
16701646
mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
16711647

16721648
let arc_props = Arc::new(parquet_props.clone());
1649+
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1650+
let options = ArrowWriterOptions::new()
1651+
.with_properties(parquet_props.clone())
1652+
.with_skip_arrow_metadata(skip_arrow_metadata);
1653+
let writer = ArrowWriter::try_new_with_options(
1654+
merged_buff.clone(),
1655+
Arc::clone(&output_schema),
1656+
options,
1657+
)?;
1658+
let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
1659+
16731660
let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1661+
row_group_writer_factory,
16741662
data,
16751663
serialize_tx,
16761664
Arc::clone(&output_schema),
@@ -1679,9 +1667,9 @@ async fn output_single_parquet_file_parallelized(
16791667
Arc::clone(&pool),
16801668
);
16811669
let file_metadata = concatenate_parallel_row_groups(
1670+
writer,
1671+
merged_buff,
16821672
serialize_rx,
1683-
Arc::clone(&output_schema),
1684-
Arc::clone(&arc_props),
16851673
object_store_writer,
16861674
pool,
16871675
)

0 commit comments

Comments
 (0)