From b28ac8cfebf5bb170a94be160636211ccbbc2091 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 13:50:18 -0400 Subject: [PATCH 01/11] Move state machine into ParquetMetadataDecoder --- parquet/src/file/metadata/push_decoder.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 811caf4fd46c..b2b426e1f90c 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -261,7 +261,7 @@ impl ParquetMetaDataPushDecoder { &mut self, ranges: Vec>, buffers: Vec, - ) -> std::result::Result<(), String> { + ) -> Result<(), String> { if self.done { return Err( "ParquetMetaDataPushDecoder: cannot push data after decoding is finished" @@ -276,7 +276,7 @@ impl ParquetMetaDataPushDecoder { /// decoded metadata or an error if not enough data is available. pub fn try_decode( &mut self, - ) -> std::result::Result, ParquetError> { + ) -> Result, ParquetError> { if self.done { return Ok(DecodeResult::Finished); } @@ -325,6 +325,17 @@ impl ParquetMetaDataPushDecoder { } } +/// Decoding state machine +#[derive(Debug)] +enum DecodeState { + Start, + NeedsFooter, + NeedsMetadata, + NeedsPageIndex, + // todo read boom filters? + Finished, +} + // These tests use the arrow writer to create a parquet file in memory // so they need the arrow feature and the test feature #[cfg(all(test, feature = "arrow"))] From 8a7a993f16d9be7266779ee3e932405eaebf8292 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 14:31:14 -0400 Subject: [PATCH 02/11] checkpoint --- parquet/src/file/metadata/push_decoder.rs | 215 +++++++++++++++------- parquet/src/file/metadata/reader.rs | 130 +++++-------- 2 files changed, 200 insertions(+), 145 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index b2b426e1f90c..841e7729e71f 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -15,9 +15,20 @@ // specific language governing permissions and limitations // under the License. +use crate::basic::ColumnOrder; use crate::errors::ParquetError; -use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; +use crate::file::metadata::{ + FileMetaData, FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, + RowGroupMetaData, +}; +use crate::file::reader::ChunkReader; +use crate::file::FOOTER_SIZE; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::TCompactSliceInputProtocol; use crate::DecodeResult; +use std::fs::metadata; +use std::sync::Arc; /// A push decoder for [`ParquetMetaData`]. /// @@ -192,8 +203,8 @@ use crate::DecodeResult; /// [`AsyncRead`]: tokio::io::AsyncRead #[derive(Debug)] pub struct ParquetMetaDataPushDecoder { - done: bool, - metadata_reader: ParquetMetaDataReader, + state: DecodeState, + page_index_policy: PageIndexPolicy, buffers: crate::util::push_buffers::PushBuffers, } @@ -211,12 +222,9 @@ impl ParquetMetaDataPushDecoder { ))); }; - let metadata_reader = - ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Optional); - Ok(Self { - done: false, - metadata_reader, + state: DecodeState::ReadingFooter, + page_index_policy: PageIndexPolicy::Optional, buffers: crate::util::push_buffers::PushBuffers::new(file_len), }) } @@ -232,9 +240,7 @@ impl ParquetMetaDataPushDecoder { /// /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self { - self.metadata_reader = self - .metadata_reader - .with_page_index_policy(page_index_policy); + self.page_index_policy = page_index_policy; self } @@ -261,12 +267,11 @@ impl ParquetMetaDataPushDecoder { &mut self, ranges: Vec>, buffers: Vec, - ) -> Result<(), String> { - if self.done { - return Err( + ) -> Result<(), ParquetError> { + if matches!(&self.state, DecodeState::Finished) { + return Err(general_err!( "ParquetMetaDataPushDecoder: cannot push data after decoding is finished" - .to_string(), - ); + )); } self.buffers.push_ranges(ranges, buffers); Ok(()) @@ -274,53 +279,137 @@ impl ParquetMetaDataPushDecoder { /// Try to decode the metadata from the pushed data, returning the /// decoded metadata or an error if not enough data is available. - pub fn try_decode( - &mut self, - ) -> Result, ParquetError> { - if self.done { - return Ok(DecodeResult::Finished); - } - - // need to have the last 8 bytes of the file to decode the metadata + pub fn try_decode(&mut self) -> Result, ParquetError> { let file_len = self.buffers.file_len(); - if !self.buffers.has_range(&(file_len - 8..file_len)) { - #[expect(clippy::single_range_in_vec_init)] - return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len])); + let footer_len = FOOTER_SIZE as u64; + loop { + match self.state { + DecodeState::ReadingFooter => { + // need to have the last 8 bytes of the file to decode the metadata + let footer_start = file_len.saturating_sub(footer_len); + + let footer_range = footer_start..file_len; + if !self.buffers.has_range(&footer_range) { + #[expect(clippy::single_range_in_vec_init)] + return Ok(DecodeResult::NeedsData(vec![footer_range])); + } + // decode footer (we just checked we have the bytes) + let footer_bytes = self.buffers.get_bytes(footer_range.start, FOOTER_SIZE)?; + let footer_bytes: [u8; FOOTER_SIZE] = footer_bytes + .as_ref() + .try_into() + .expect("checked length above"); + let footer_tail = FooterTail::try_from(footer_bytes)?; + + #[cfg(not(feature = "encryption"))] + if footer_tail.is_encrypted_footer() { + return Err(general_err!( + "Parquet file has an encrypted footer but the encryption feature is disabled" + )); + }; + + self.state = DecodeState::ReadingMetadata(footer_tail); + continue; + } + + DecodeState::ReadingMetadata(footer_tail) => { + let metadata_len: u64 = footer_tail.metadata_length() as u64; + let metadata_start = self.buffers.file_len() - footer_len - metadata_len; + + let metadata_range = metadata_start..(metadata_start + metadata_len); + // todo: refactor this range checking logic into a function + if !self.buffers.has_range(&metadata_range) { + #[expect(clippy::single_range_in_vec_init)] + return Ok(DecodeResult::NeedsData(vec![metadata_range])); + } + + let metadata_bytes = self + .buffers + .get_bytes(metadata_range.start, footer_tail.metadata_length())?; + let metadata = Self::decode_metadata(&metadata_bytes)?; + self.state = DecodeState::ReadingPageIndex(metadata); + continue; + } + + DecodeState::ReadingPageIndex(metadata) => { + let index_required = match self.page_index_policy { + PageIndexPolicy::Skip => { + self.state = DecodeState::Finished(metadata); + continue; + } + PageIndexPolicy::Optional => false, + PageIndexPolicy::Required => true, + }; + } + + DecodeState::Finished => return Ok(DecodeResult::Finished), + } } + } - // Try to parse the metadata from the buffers we have. - // - // If we don't have enough data, returns a `ParquetError::NeedMoreData` - // with the number of bytes needed to complete the metadata parsing. - // - // If we have enough data, returns `Ok(())` and we can complete - // the metadata parsing. - let maybe_metadata = self - .metadata_reader - .try_parse_sized(&self.buffers, self.buffers.file_len()); - - match maybe_metadata { - Ok(()) => { - // Metadata successfully parsed, proceed to decode the row groups - let metadata = self.metadata_reader.finish()?; - self.done = true; - Ok(DecodeResult::Data(metadata)) - } + /// Decodes [`ParquetMetaData`] from the provided bytes. + /// + /// Typically this is used to decode the metadata from the end of a parquet + /// file. The format of `buf` is the Thrift compact binary protocol, as specified + /// by the [Parquet Spec]. + /// + /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata + pub fn decode_metadata(buf: &[u8]) -> crate::errors::Result { + let mut prot = TCompactSliceInputProtocol::new(buf); + + let t_file_metadata: crate::format::FileMetaData = + crate::format::FileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse metadata: {}", e))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); + } + let column_orders = + Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_descr, + column_orders, + ); + + Ok(ParquetMetaData::new(file_metadata, row_groups)) + } - Err(ParquetError::NeedMoreData(needed)) => { - let needed = needed as u64; - let Some(start_offset) = file_len.checked_sub(needed) else { - return Err(ParquetError::General(format!( - "Parquet metadata reader needs at least {needed} bytes, but file length is only {file_len}" - ))); + /// Parses column orders from Thrift definition. + /// If no column orders are defined, returns `None`. + fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, + ) -> crate::errors::Result>> { + match t_column_orders { + Some(orders) => { + // Should always be the case + if orders.len() != schema_descr.num_columns() { + return Err(general_err!("Column order length mismatch")); }; - let needed_range = start_offset..start_offset + needed; - // needs `needed_range` bytes at the end of the file - Ok(DecodeResult::NeedsData(vec![needed_range])) + let mut res = Vec::new(); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + crate::format::ColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Ok(Some(res)) } - Err(ParquetError::NeedMoreDataRange(range)) => Ok(DecodeResult::NeedsData(vec![range])), - - Err(e) => Err(e), // some other error, pass back + None => Ok(None), } } } @@ -328,12 +417,14 @@ impl ParquetMetaDataPushDecoder { /// Decoding state machine #[derive(Debug)] enum DecodeState { - Start, - NeedsFooter, - NeedsMetadata, - NeedsPageIndex, + /// Reading the last 8 bytes of the file + ReadingFooter, + /// Reading the metadata thrift structure + ReadingMetadata(FooterTail), + // + ReadingPageIndex(ParquetMetaData), // todo read boom filters? - Finished, + Finished(ParquetMetaData), } // These tests use the arrow writer to create a parquet file in memory diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 8d92d1e0aa8d..4c1d3fe187d0 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -26,7 +26,10 @@ use crate::encryption::{ use bytes::Bytes; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData}; +use crate::file::metadata::{ + ColumnChunkMetaData, FileMetaData, ParquetMetaData, ParquetMetaDataPushDecoder, + RowGroupMetaData, +}; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::reader::ChunkReader; @@ -103,15 +106,45 @@ impl From for PageIndexPolicy { } } -/// Describes how the footer metadata is stored +/// Describes Parquet footer metadata is stored /// /// This is parsed from the last 8 bytes of the Parquet file +/// +/// There are 8 bytes at the end of the Parquet footer with the following layout: +/// * 4 bytes for the metadata length +/// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer) +/// +/// ```text +/// +-----+------------------+ +/// | len | 'PAR1' or 'PARE' | +/// +-----+------------------+ +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] pub struct FooterTail { metadata_length: usize, encrypted_footer: bool, } impl FooterTail { + /// Try to decode the footer tail from the given 8 bytes + fn try_new(slice: &[u8; FOOTER_SIZE]) -> Result { + let magic = &slice[4..]; + let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER { + true + } else if magic == PARQUET_MAGIC { + false + } else { + return Err(general_err!("Invalid Parquet file. Corrupt footer")); + }; + // get the metadata length from the footer + let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap()); + Ok(FooterTail { + // u32 won't be larger than usize in most cases + metadata_length: metadata_len as usize, + encrypted_footer, + }) + } + /// The length of the footer metadata in bytes pub fn metadata_length(&self) -> usize { self.metadata_length @@ -123,6 +156,14 @@ impl FooterTail { } } +impl TryFrom<[u8; FOOTER_SIZE]> for FooterTail { + type Error = ParquetError; + + fn try_from(value: [u8; FOOTER_SIZE]) -> Result { + Self::try_new(&value) + } +} + impl ParquetMetaDataReader { /// Create a new [`ParquetMetaDataReader`] pub fn new() -> Self { @@ -873,33 +914,10 @@ impl ParquetMetaDataReader { } } - /// Decodes the end of the Parquet footer - /// - /// There are 8 bytes at the end of the Parquet footer with the following layout: - /// * 4 bytes for the metadata length - /// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer) - /// - /// ```text - /// +-----+------------------+ - /// | len | 'PAR1' or 'PARE' | - /// +-----+------------------+ - /// ``` + /// Decodes a [`FooterTail`] from the provided 8-byte slice. + // todo deprecate pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result { - let magic = &slice[4..]; - let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER { - true - } else if magic == PARQUET_MAGIC { - false - } else { - return Err(general_err!("Invalid Parquet file. Corrupt footer")); - }; - // get the metadata length from the footer - let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap()); - Ok(FooterTail { - // u32 won't be larger than usize in most cases - metadata_length: metadata_len as usize, - encrypted_footer, - }) + FooterTail::try_new(slice) } /// Decodes the Parquet footer, returning the metadata length in bytes @@ -1057,61 +1075,7 @@ impl ParquetMetaDataReader { /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata(buf: &[u8]) -> Result { - let mut prot = TCompactSliceInputProtocol::new(buf); - - let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse metadata: {}", e))?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Arc::new(SchemaDescriptor::new(schema)); - - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); - } - let column_orders = - Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - t_file_metadata.key_value_metadata, - schema_descr, - column_orders, - ); - - Ok(ParquetMetaData::new(file_metadata, row_groups)) - } - - /// Parses column orders from Thrift definition. - /// If no column orders are defined, returns `None`. - fn parse_column_orders( - t_column_orders: Option>, - schema_descr: &SchemaDescriptor, - ) -> Result>> { - match t_column_orders { - Some(orders) => { - // Should always be the case - if orders.len() != schema_descr.num_columns() { - return Err(general_err!("Column order length mismatch")); - }; - let mut res = Vec::new(); - for (i, column) in schema_descr.columns().iter().enumerate() { - match orders[i] { - TColumnOrder::TYPEORDER(_) => { - let sort_order = ColumnOrder::get_sort_order( - column.logical_type(), - column.converted_type(), - column.physical_type(), - ); - res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); - } - } - } - Ok(Some(res)) - } - None => Ok(None), - } + ParquetMetaDataPushDecoder::decode_metadata(buf) } } From 719bcb40d610c07a28f4c49416233d60f688c7d6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 15:33:44 -0400 Subject: [PATCH 03/11] Move code around --- parquet/src/file/metadata/mod.rs | 1 + parquet/src/file/metadata/parser.rs | 161 ++++++++++++++++++++++ parquet/src/file/metadata/push_decoder.rs | 90 ++++++++---- parquet/src/file/metadata/reader.rs | 69 +++------- 4 files changed, 239 insertions(+), 82 deletions(-) create mode 100644 parquet/src/file/metadata/parser.rs diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index f90143104ce2..fae2136fd38b 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -91,6 +91,7 @@ //! * Same name, different struct //! ``` mod memory; +mod parser; mod push_decoder; pub(crate) mod reader; mod writer; diff --git a/parquet/src/file/metadata/parser.rs b/parquet/src/file/metadata/parser.rs new file mode 100644 index 000000000000..12eeb2e0591a --- /dev/null +++ b/parquet/src/file/metadata/parser.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Internal metadata parsing routines +//! +//! In general these functions parse thrift-encoded metadata from a byte slice +//! into the corresponding Rust structures + +use std::sync::Arc; +use bytes::Bytes; +use crate::basic::ColumnOrder; +use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, PageIndexPolicy, ParquetMetaData, RowGroupMetaData}; +use crate::schema::types; +use crate::schema::types::SchemaDescriptor; +use crate::thrift::TCompactSliceInputProtocol; +use crate::thrift::TSerializable; +use crate::errors::{ParquetError, Result}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; + +/// Decodes [`ParquetMetaData`] from the provided bytes. +/// +/// Typically this is used to decode the metadata from the end of a parquet +/// file. The format of `buf` is the Thrift compact binary protocol, as specified +/// by the [Parquet Spec]. +/// +/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata +pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result { + let mut prot = TCompactSliceInputProtocol::new(buf); + + let t_file_metadata: crate::format::FileMetaData = + crate::format::FileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse metadata: {}", e))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); + } + let column_orders = + parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_descr, + column_orders, + ); + + Ok(ParquetMetaData::new(file_metadata, row_groups)) +} + +/// Parses column orders from Thrift definition. +/// If no column orders are defined, returns `None`. +pub(crate) fn parse_column_orders( + t_column_orders: Option>, + schema_descr: &SchemaDescriptor, +) -> crate::errors::Result>> { + match t_column_orders { + Some(orders) => { + // Should always be the case + if orders.len() != schema_descr.num_columns() { + return Err(general_err!("Column order length mismatch")); + }; + let mut res = Vec::new(); + for (i, column) in schema_descr.columns().iter().enumerate() { + match orders[i] { + crate::format::ColumnOrder::TYPEORDER(_) => { + let sort_order = ColumnOrder::get_sort_order( + column.logical_type(), + column.converted_type(), + column.physical_type(), + ); + res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); + } + } + } + Ok(Some(res)) + } + None => Ok(None), + } +} + +#[cfg(test)] +mod test { + use crate::format::TypeDefinedOrder; +use crate::basic::{SortOrder, Type}; + use crate::format::ColumnOrder as TColumnOrder; +use super::*; + use crate::file::metadata::SchemaType; + #[test] + fn test_metadata_column_orders_parse() { + // Define simple schema, we do not need to provide logical types. + let fields = vec![ + Arc::new( + SchemaType::primitive_type_builder("col1", Type::INT32) + .build() + .unwrap(), + ), + Arc::new( + SchemaType::primitive_type_builder("col2", Type::FLOAT) + .build() + .unwrap(), + ), + ]; + let schema = SchemaType::group_type_builder("schema") + .with_fields(fields) + .build() + .unwrap(); + let schema_descr = SchemaDescriptor::new(Arc::new(schema)); + + let t_column_orders = Some(vec![ + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), + ]); + + assert_eq!( + parse_column_orders(t_column_orders, &schema_descr).unwrap(), + Some(vec![ + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), + ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) + ]) + ); + + // Test when no column orders are defined. + assert_eq!( + parse_column_orders(None, &schema_descr).unwrap(), + None + ); + } + + #[test] + fn test_metadata_column_orders_len_mismatch() { + let schema = SchemaType::group_type_builder("schema").build().unwrap(); + let schema_descr = SchemaDescriptor::new(Arc::new(schema)); + + let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); + + let res = parse_column_orders(t_column_orders, &schema_descr); + assert!(res.is_err()); + assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch")); + } +} \ No newline at end of file diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 841e7729e71f..24025ec1578b 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -21,13 +21,16 @@ use crate::file::metadata::{ FileMetaData, FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, }; +use crate::file::page_index::index_reader::acc_range; use crate::file::reader::ChunkReader; use crate::file::FOOTER_SIZE; use crate::schema::types; use crate::schema::types::SchemaDescriptor; use crate::thrift::TCompactSliceInputProtocol; +use crate::thrift::TSerializable; use crate::DecodeResult; use std::fs::metadata; +use std::ops::Range; use std::sync::Arc; /// A push decoder for [`ParquetMetaData`]. @@ -203,11 +206,26 @@ use std::sync::Arc; /// [`AsyncRead`]: tokio::io::AsyncRead #[derive(Debug)] pub struct ParquetMetaDataPushDecoder { + /// Decoding state state: DecodeState, - page_index_policy: PageIndexPolicy, + /// policy for loading ColumnIndex (part of the PageIndex) + column_index_policy: PageIndexPolicy, + /// policy for loading OffsetIndex (part of the PageIndex) + offset_index_policy: PageIndexPolicy, + /// Underyling buffers buffers: crate::util::push_buffers::PushBuffers, } +// Macro that checks if the ranges are in buffer, and if not returns NeedsData +macro_rules! ensure_range { + ($buffers:expr, $range:expr) => { + if !$buffers.has_range(&$range) { + #[expect(clippy::single_range_in_vec_init)] + return Ok(DecodeResult::NeedsData(vec![$range])); + } + }; +} + impl ParquetMetaDataPushDecoder { /// Create a new `ParquetMetaDataPushDecoder` with the given file length. /// @@ -224,7 +242,8 @@ impl ParquetMetaDataPushDecoder { Ok(Self { state: DecodeState::ReadingFooter, - page_index_policy: PageIndexPolicy::Optional, + column_index_policy: PageIndexPolicy::Optional, + offset_index_policy: PageIndexPolicy::Optional, buffers: crate::util::push_buffers::PushBuffers::new(file_len), }) } @@ -240,7 +259,8 @@ impl ParquetMetaDataPushDecoder { /// /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub fn with_page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self { - self.page_index_policy = page_index_policy; + self.column_index_policy = page_index_policy; + self.offset_index_policy = page_index_policy; self } @@ -288,18 +308,10 @@ impl ParquetMetaDataPushDecoder { // need to have the last 8 bytes of the file to decode the metadata let footer_start = file_len.saturating_sub(footer_len); - let footer_range = footer_start..file_len; - if !self.buffers.has_range(&footer_range) { - #[expect(clippy::single_range_in_vec_init)] - return Ok(DecodeResult::NeedsData(vec![footer_range])); - } + ensure_range!(&self.buffers, footer_start..file_len); // decode footer (we just checked we have the bytes) - let footer_bytes = self.buffers.get_bytes(footer_range.start, FOOTER_SIZE)?; - let footer_bytes: [u8; FOOTER_SIZE] = footer_bytes - .as_ref() - .try_into() - .expect("checked length above"); - let footer_tail = FooterTail::try_from(footer_bytes)?; + let footer_bytes = self.buffers.get_bytes(footer_start, FOOTER_SIZE)?; + let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?; #[cfg(not(feature = "encryption"))] if footer_tail.is_encrypted_footer() { @@ -316,30 +328,28 @@ impl ParquetMetaDataPushDecoder { let metadata_len: u64 = footer_tail.metadata_length() as u64; let metadata_start = self.buffers.file_len() - footer_len - metadata_len; - let metadata_range = metadata_start..(metadata_start + metadata_len); - // todo: refactor this range checking logic into a function - if !self.buffers.has_range(&metadata_range) { - #[expect(clippy::single_range_in_vec_init)] - return Ok(DecodeResult::NeedsData(vec![metadata_range])); - } + ensure_range!(&self.buffers, metadata_start..metadata_len); let metadata_bytes = self .buffers - .get_bytes(metadata_range.start, footer_tail.metadata_length())?; + .get_bytes(metadata_start, footer_tail.metadata_length())?; let metadata = Self::decode_metadata(&metadata_bytes)?; self.state = DecodeState::ReadingPageIndex(metadata); continue; } DecodeState::ReadingPageIndex(metadata) => { - let index_required = match self.page_index_policy { - PageIndexPolicy::Skip => { - self.state = DecodeState::Finished(metadata); - continue; - } - PageIndexPolicy::Optional => false, - PageIndexPolicy::Required => true, + let range = range_for_page_index( + &metadata, + &self.column_index_policy, + &self.offset_index_policy, + ); + let Some(range) = range else { + // no ranges means no page indexes are needed + self.state = DecodeState::Finished; + return Ok(DecodeResult::Data(metadata)); }; + ensure_range!(&self.buffers, range); } DecodeState::Finished => return Ok(DecodeResult::Finished), @@ -424,7 +434,29 @@ enum DecodeState { // ReadingPageIndex(ParquetMetaData), // todo read boom filters? - Finished(ParquetMetaData), + Finished, +} + +/// Returns the byte range needed to read the offset/page indexes, based on the +/// policies +/// +/// Returns None if no page indexes are needed +pub(crate) fn range_for_page_index( + metadata: &ParquetMetaData, + column_index_policy: &PageIndexPolicy, + offset_index_policy: &PageIndexPolicy, +) -> Option> { + // Get bounds needed for page indexes (if any are present in the file). + let mut range = None; + for c in metadata.row_groups().iter().flat_map(|r| r.columns()) { + if column_index_policy != &PageIndexPolicy::Skip { + range = acc_range(range, c.column_index_range()); + } + if offset_index_policy != &PageIndexPolicy::Skip { + range = acc_range(range, c.offset_index_range()); + } + } + range } // These tests use the arrow writer to create a parquet file in memory diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 4c1d3fe187d0..0fadfebab22f 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -164,6 +164,22 @@ impl TryFrom<[u8; FOOTER_SIZE]> for FooterTail { } } +impl TryFrom<&[u8]> for FooterTail { + type Error = ParquetError; + + fn try_from(value: &[u8]) -> Result { + if value.len() != FOOTER_SIZE { + return Err(general_err!( + "Invalid footer length {}, expected {}", + value.len(), + FOOTER_SIZE + )); + } + let slice: &[u8; FOOTER_SIZE] = value.try_into().unwrap(); + Self::try_new(slice) + } +} + impl ParquetMetaDataReader { /// Create a new [`ParquetMetaDataReader`] pub fn new() -> Self { @@ -1149,59 +1165,6 @@ mod tests { assert!(matches!(err, ParquetError::NeedMoreData(263))); } - #[test] - fn test_metadata_column_orders_parse() { - // Define simple schema, we do not need to provide logical types. - let fields = vec![ - Arc::new( - SchemaType::primitive_type_builder("col1", Type::INT32) - .build() - .unwrap(), - ), - Arc::new( - SchemaType::primitive_type_builder("col2", Type::FLOAT) - .build() - .unwrap(), - ), - ]; - let schema = SchemaType::group_type_builder("schema") - .with_fields(fields) - .build() - .unwrap(); - let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - - let t_column_orders = Some(vec![ - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), - ]); - - assert_eq!( - ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(), - Some(vec![ - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), - ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) - ]) - ); - - // Test when no column orders are defined. - assert_eq!( - ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(), - None - ); - } - - #[test] - fn test_metadata_column_orders_len_mismatch() { - let schema = SchemaType::group_type_builder("schema").build().unwrap(); - let schema_descr = SchemaDescriptor::new(Arc::new(schema)); - - let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); - - let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr); - assert!(res.is_err()); - assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch")); - } - #[test] #[allow(deprecated)] fn test_try_parse() { From fbb879ababad163c544a87dbc1ff266ac86c7025 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 15:41:06 -0400 Subject: [PATCH 04/11] checkpoint --- parquet/src/file/metadata/parser.rs | 32 +++++++++-------------- parquet/src/file/metadata/push_decoder.rs | 14 ++++++++-- parquet/src/file/metadata/reader.rs | 13 +++------ 3 files changed, 28 insertions(+), 31 deletions(-) diff --git a/parquet/src/file/metadata/parser.rs b/parquet/src/file/metadata/parser.rs index 12eeb2e0591a..46069a715720 100644 --- a/parquet/src/file/metadata/parser.rs +++ b/parquet/src/file/metadata/parser.rs @@ -20,18 +20,16 @@ //! In general these functions parse thrift-encoded metadata from a byte slice //! into the corresponding Rust structures -use std::sync::Arc; -use bytes::Bytes; use crate::basic::ColumnOrder; -use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, PageIndexPolicy, ParquetMetaData, RowGroupMetaData}; +use crate::errors::ParquetError; +use crate::file::metadata::{ + ColumnChunkMetaData, FileMetaData, PageIndexPolicy, ParquetMetaData, RowGroupMetaData, +}; use crate::schema::types; use crate::schema::types::SchemaDescriptor; use crate::thrift::TCompactSliceInputProtocol; use crate::thrift::TSerializable; -use crate::errors::{ParquetError, Result}; -use crate::file::page_index::index::Index; -use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index}; -use crate::file::page_index::offset_index::OffsetIndexMetaData; +use std::sync::Arc; /// Decodes [`ParquetMetaData`] from the provided bytes. /// @@ -53,8 +51,7 @@ pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result crate::errors::Result>, schema_descr: &SchemaDescriptor, ) -> crate::errors::Result>> { @@ -101,11 +98,11 @@ pub(crate) fn parse_column_orders( #[cfg(test)] mod test { - use crate::format::TypeDefinedOrder; -use crate::basic::{SortOrder, Type}; - use crate::format::ColumnOrder as TColumnOrder; -use super::*; + use super::*; + use crate::basic::{SortOrder, Type}; use crate::file::metadata::SchemaType; + use crate::format::ColumnOrder as TColumnOrder; + use crate::format::TypeDefinedOrder; #[test] fn test_metadata_column_orders_parse() { // Define simple schema, we do not need to provide logical types. @@ -141,10 +138,7 @@ use super::*; ); // Test when no column orders are defined. - assert_eq!( - parse_column_orders(None, &schema_descr).unwrap(), - None - ); + assert_eq!(parse_column_orders(None, &schema_descr).unwrap(), None); } #[test] @@ -158,4 +152,4 @@ use super::*; assert!(res.is_err()); assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch")); } -} \ No newline at end of file +} diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 24025ec1578b..3714e68dcc2c 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -29,7 +29,6 @@ use crate::schema::types::SchemaDescriptor; use crate::thrift::TCompactSliceInputProtocol; use crate::thrift::TSerializable; use crate::DecodeResult; -use std::fs::metadata; use std::ops::Range; use std::sync::Arc; @@ -303,7 +302,7 @@ impl ParquetMetaDataPushDecoder { let file_len = self.buffers.file_len(); let footer_len = FOOTER_SIZE as u64; loop { - match self.state { + match std::mem::replace(&mut self.state, DecodeState::Intermediate) { DecodeState::ReadingFooter => { // need to have the last 8 bytes of the file to decode the metadata let footer_start = file_len.saturating_sub(footer_len); @@ -350,9 +349,17 @@ impl ParquetMetaDataPushDecoder { return Ok(DecodeResult::Data(metadata)); }; ensure_range!(&self.buffers, range); + return Err(general_err!( + "ParquetMetaDataPushDecoder: page index reading not yet implemented" + )); } DecodeState::Finished => return Ok(DecodeResult::Finished), + DecodeState::Intermediate => { + return Err(general_err!( + "ParquetMetaDataPushDecoder: internal error, invalid state" + )); + } } } } @@ -435,6 +442,9 @@ enum DecodeState { ReadingPageIndex(ParquetMetaData), // todo read boom filters? Finished, + /// State left during decoding. This should never be observed outside + /// of the try_decode method + Intermediate, } /// Returns the byte range needed to read the offset/page indexes, based on the diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 0fadfebab22f..e80cf76d20de 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -15,9 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::{io::Read, ops::Range, sync::Arc}; +use std::{io::Read, ops::Range}; -use crate::basic::ColumnOrder; #[cfg(feature = "encryption")] use crate::encryption::{ decrypt::{FileDecryptionProperties, FileDecryptor}, @@ -26,20 +25,14 @@ use crate::encryption::{ use bytes::Bytes; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ - ColumnChunkMetaData, FileMetaData, ParquetMetaData, ParquetMetaDataPushDecoder, - RowGroupMetaData, -}; +use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataPushDecoder}; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use crate::file::reader::ChunkReader; use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; -use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData}; #[cfg(feature = "encryption")] use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}; -use crate::schema::types; -use crate::schema::types::SchemaDescriptor; -use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; +use crate::thrift::TSerializable; #[cfg(all(feature = "async", feature = "arrow"))] use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch}; From 01871eed234293419864d022645168279b0e1e54 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 16:28:52 -0400 Subject: [PATCH 05/11] remove dead code --- parquet/src/file/metadata/push_decoder.rs | 69 +---------------------- parquet/src/file/metadata/reader.rs | 2 +- 2 files changed, 3 insertions(+), 68 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 3714e68dcc2c..b731d413b631 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -17,6 +17,7 @@ use crate::basic::ColumnOrder; use crate::errors::ParquetError; +use crate::file::metadata::parser::decode_metadata; use crate::file::metadata::{ FileMetaData, FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, @@ -332,7 +333,7 @@ impl ParquetMetaDataPushDecoder { let metadata_bytes = self .buffers .get_bytes(metadata_start, footer_tail.metadata_length())?; - let metadata = Self::decode_metadata(&metadata_bytes)?; + let metadata = decode_metadata(&metadata_bytes)?; self.state = DecodeState::ReadingPageIndex(metadata); continue; } @@ -363,72 +364,6 @@ impl ParquetMetaDataPushDecoder { } } } - - /// Decodes [`ParquetMetaData`] from the provided bytes. - /// - /// Typically this is used to decode the metadata from the end of a parquet - /// file. The format of `buf` is the Thrift compact binary protocol, as specified - /// by the [Parquet Spec]. - /// - /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata - pub fn decode_metadata(buf: &[u8]) -> crate::errors::Result { - let mut prot = TCompactSliceInputProtocol::new(buf); - - let t_file_metadata: crate::format::FileMetaData = - crate::format::FileMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse metadata: {}", e))?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Arc::new(SchemaDescriptor::new(schema)); - - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); - } - let column_orders = - Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - t_file_metadata.key_value_metadata, - schema_descr, - column_orders, - ); - - Ok(ParquetMetaData::new(file_metadata, row_groups)) - } - - /// Parses column orders from Thrift definition. - /// If no column orders are defined, returns `None`. - fn parse_column_orders( - t_column_orders: Option>, - schema_descr: &SchemaDescriptor, - ) -> crate::errors::Result>> { - match t_column_orders { - Some(orders) => { - // Should always be the case - if orders.len() != schema_descr.num_columns() { - return Err(general_err!("Column order length mismatch")); - }; - let mut res = Vec::new(); - for (i, column) in schema_descr.columns().iter().enumerate() { - match orders[i] { - crate::format::ColumnOrder::TYPEORDER(_) => { - let sort_order = ColumnOrder::get_sort_order( - column.logical_type(), - column.converted_type(), - column.physical_type(), - ); - res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); - } - } - } - Ok(Some(res)) - } - None => Ok(None), - } - } } /// Decoding state machine diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index e80cf76d20de..03f38e168593 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -1084,7 +1084,7 @@ impl ParquetMetaDataReader { /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata(buf: &[u8]) -> Result { - ParquetMetaDataPushDecoder::decode_metadata(buf) + Self::decode_metadata(buf) } } From b77c6f57fb4aa9058c5e18323c4bce80243ce6e5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 16:31:43 -0400 Subject: [PATCH 06/11] Remove more redundancy --- parquet/src/file/metadata/push_decoder.rs | 12 ++++++------ parquet/src/file/metadata/reader.rs | 21 +++++++-------------- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index b731d413b631..235b5ab78eca 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -341,8 +341,8 @@ impl ParquetMetaDataPushDecoder { DecodeState::ReadingPageIndex(metadata) => { let range = range_for_page_index( &metadata, - &self.column_index_policy, - &self.offset_index_policy, + self.column_index_policy, + self.offset_index_policy, ); let Some(range) = range else { // no ranges means no page indexes are needed @@ -388,16 +388,16 @@ enum DecodeState { /// Returns None if no page indexes are needed pub(crate) fn range_for_page_index( metadata: &ParquetMetaData, - column_index_policy: &PageIndexPolicy, - offset_index_policy: &PageIndexPolicy, + column_index_policy: PageIndexPolicy, + offset_index_policy: PageIndexPolicy, ) -> Option> { // Get bounds needed for page indexes (if any are present in the file). let mut range = None; for c in metadata.row_groups().iter().flat_map(|r| r.columns()) { - if column_index_policy != &PageIndexPolicy::Skip { + if column_index_policy != PageIndexPolicy::Skip { range = acc_range(range, c.column_index_range()); } - if offset_index_policy != &PageIndexPolicy::Skip { + if offset_index_policy != PageIndexPolicy::Skip { range = acc_range(range, c.offset_index_range()); } } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 03f38e168593..498567b80bab 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -38,6 +38,7 @@ use crate::thrift::TSerializable; use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch}; #[cfg(feature = "encryption")] use crate::encryption::decrypt::CryptoContext; +use crate::file::metadata::push_decoder::range_for_page_index; use crate::file::page_index::offset_index::OffsetIndexMetaData; /// Reads the [`ParquetMetaData`] from a byte stream. @@ -749,20 +750,12 @@ impl ParquetMetaDataReader { fn range_for_page_index(&self) -> Option> { // sanity check - self.metadata.as_ref()?; - - // Get bounds needed for page indexes (if any are present in the file). - let mut range = None; - let metadata = self.metadata.as_ref().unwrap(); - for c in metadata.row_groups().iter().flat_map(|r| r.columns()) { - if self.column_index != PageIndexPolicy::Skip { - range = acc_range(range, c.column_index_range()); - } - if self.offset_index != PageIndexPolicy::Skip { - range = acc_range(range, c.offset_index_range()); - } - } - range + let metadata = self.metadata.as_ref()?; + range_for_page_index( + self.metadata.as_ref()?, + self.column_index, + self.offset_index, + ) } // One-shot parse of footer. From 93c08d3ac59d30598e648a321e2a50a905f63ad9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 17:04:12 -0400 Subject: [PATCH 07/11] move more code --- parquet/src/file/metadata/parser.rs | 166 ++++++++++++++++++++++++++++ parquet/src/file/metadata/reader.rs | 164 +-------------------------- 2 files changed, 172 insertions(+), 158 deletions(-) diff --git a/parquet/src/file/metadata/parser.rs b/parquet/src/file/metadata/parser.rs index 46069a715720..89d7073809fa 100644 --- a/parquet/src/file/metadata/parser.rs +++ b/parquet/src/file/metadata/parser.rs @@ -25,10 +25,14 @@ use crate::errors::ParquetError; use crate::file::metadata::{ ColumnChunkMetaData, FileMetaData, PageIndexPolicy, ParquetMetaData, RowGroupMetaData, }; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::schema::types; use crate::schema::types::SchemaDescriptor; use crate::thrift::TCompactSliceInputProtocol; use crate::thrift::TSerializable; +use bytes::Bytes; use std::sync::Arc; /// Decodes [`ParquetMetaData`] from the provided bytes. @@ -96,6 +100,168 @@ pub(crate) fn parse_column_orders( } } +pub(crate) fn parse_column_index( + metadata: &mut ParquetMetaData, + column_index_policy: PageIndexPolicy, + bytes: &Bytes, + start_offset: u64, +) -> crate::errors::Result<()> { + if column_index_policy == PageIndexPolicy::Skip { + return Ok(()); + } + let index = metadata + .row_groups() + .iter() + .enumerate() + .map(|(rg_idx, x)| { + x.columns() + .iter() + .enumerate() + .map(|(col_idx, c)| match c.column_index_range() { + Some(r) => { + let r_start = usize::try_from(r.start - start_offset)?; + let r_end = usize::try_from(r.end - start_offset)?; + parse_single_column_index( + &bytes[r_start..r_end], + metadata, + c, + rg_idx, + col_idx, + ) + } + None => Ok(Index::NONE), + }) + .collect::>>() + }) + .collect::>>()?; + + metadata.set_column_index(Some(index)); + Ok(()) +} + +#[cfg(feature = "encryption")] +fn parse_single_column_index( + bytes: &[u8], + metadata: &ParquetMetaData, + column: &ColumnChunkMetaData, + row_group_index: usize, + col_index: usize, +) -> crate::errors::Result { + use crate::encryption::decrypt::CryptoContext; + match &column.column_crypto_metadata { + Some(crypto_metadata) => { + let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { + general_err!("Cannot decrypt column index, no file decryptor set") + })?; + let crypto_context = CryptoContext::for_column( + file_decryptor, + crypto_metadata, + row_group_index, + col_index, + )?; + let column_decryptor = crypto_context.metadata_decryptor(); + let aad = crypto_context.create_column_index_aad()?; + let plaintext = column_decryptor.decrypt(bytes, &aad)?; + decode_column_index(&plaintext, column.column_type()) + } + None => decode_column_index(bytes, column.column_type()), + } +} + +#[cfg(not(feature = "encryption"))] +fn parse_single_column_index( + bytes: &[u8], + _metadata: &ParquetMetaData, + column: &ColumnChunkMetaData, + _row_group_index: usize, + _col_index: usize, +) -> crate::errors::Result { + decode_column_index(bytes, column.column_type()) +} + +pub(crate) fn parse_offset_index( + metadata: &mut ParquetMetaData, + offset_index_policy: PageIndexPolicy, + bytes: &Bytes, + start_offset: u64, +) -> crate::errors::Result<()> { + if offset_index_policy == PageIndexPolicy::Skip { + return Ok(()); + } + let row_groups = metadata.row_groups(); + let mut all_indexes = Vec::with_capacity(row_groups.len()); + for (rg_idx, x) in row_groups.iter().enumerate() { + let mut row_group_indexes = Vec::with_capacity(x.columns().len()); + for (col_idx, c) in x.columns().iter().enumerate() { + let result = match c.offset_index_range() { + Some(r) => { + let r_start = usize::try_from(r.start - start_offset)?; + let r_end = usize::try_from(r.end - start_offset)?; + parse_single_offset_index(&bytes[r_start..r_end], metadata, c, rg_idx, col_idx) + } + None => Err(general_err!("missing offset index")), + }; + + match result { + Ok(index) => row_group_indexes.push(index), + Err(e) => { + if offset_index_policy == PageIndexPolicy::Required { + return Err(e); + } else { + // Invalidate and return + metadata.set_column_index(None); + metadata.set_offset_index(None); + return Ok(()); + } + } + } + } + all_indexes.push(row_group_indexes); + } + metadata.set_offset_index(Some(all_indexes)); + Ok(()) +} + +#[cfg(feature = "encryption")] +fn parse_single_offset_index( + bytes: &[u8], + metadata: &ParquetMetaData, + column: &ColumnChunkMetaData, + row_group_index: usize, + col_index: usize, +) -> crate::errors::Result { + use crate::encryption::decrypt::CryptoContext; + match &column.column_crypto_metadata { + Some(crypto_metadata) => { + let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { + general_err!("Cannot decrypt offset index, no file decryptor set") + })?; + let crypto_context = CryptoContext::for_column( + file_decryptor, + crypto_metadata, + row_group_index, + col_index, + )?; + let column_decryptor = crypto_context.metadata_decryptor(); + let aad = crypto_context.create_offset_index_aad()?; + let plaintext = column_decryptor.decrypt(bytes, &aad)?; + decode_offset_index(&plaintext) + } + None => decode_offset_index(bytes), + } +} + +#[cfg(not(feature = "encryption"))] +fn parse_single_offset_index( + bytes: &[u8], + _metadata: &ParquetMetaData, + _column: &ColumnChunkMetaData, + _row_group_index: usize, + _col_index: usize, +) -> crate::errors::Result { + decode_offset_index(bytes) +} + #[cfg(test)] mod test { use super::*; diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 498567b80bab..639b69232e78 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -38,6 +38,7 @@ use crate::thrift::TSerializable; use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch}; #[cfg(feature = "encryption")] use crate::encryption::decrypt::CryptoContext; +use crate::file::metadata::parser::{parse_column_index, parse_offset_index}; use crate::file::metadata::push_decoder::range_for_page_index; use crate::file::page_index::offset_index::OffsetIndexMetaData; @@ -468,8 +469,11 @@ impl ParquetMetaDataReader { let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?; let offset = range.start; - self.parse_column_index(&bytes, offset)?; - self.parse_offset_index(&bytes, offset)?; + // This should always be set + if let Some(metadata) = self.metadata.as_mut() { + parse_column_index(metadata, self.column_index, &bytes, offset)?; + parse_offset_index(metadata, self.offset_index, &bytes, offset)?; + } Ok(()) } @@ -592,162 +596,6 @@ impl ParquetMetaDataReader { Ok(()) } - fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { - let metadata = self.metadata.as_mut().unwrap(); - if self.column_index != PageIndexPolicy::Skip { - let index = metadata - .row_groups() - .iter() - .enumerate() - .map(|(rg_idx, x)| { - x.columns() - .iter() - .enumerate() - .map(|(col_idx, c)| match c.column_index_range() { - Some(r) => { - let r_start = usize::try_from(r.start - start_offset)?; - let r_end = usize::try_from(r.end - start_offset)?; - Self::parse_single_column_index( - &bytes[r_start..r_end], - metadata, - c, - rg_idx, - col_idx, - ) - } - None => Ok(Index::NONE), - }) - .collect::>>() - }) - .collect::>>()?; - - metadata.set_column_index(Some(index)); - } - Ok(()) - } - - #[cfg(feature = "encryption")] - fn parse_single_column_index( - bytes: &[u8], - metadata: &ParquetMetaData, - column: &ColumnChunkMetaData, - row_group_index: usize, - col_index: usize, - ) -> Result { - match &column.column_crypto_metadata { - Some(crypto_metadata) => { - let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { - general_err!("Cannot decrypt column index, no file decryptor set") - })?; - let crypto_context = CryptoContext::for_column( - file_decryptor, - crypto_metadata, - row_group_index, - col_index, - )?; - let column_decryptor = crypto_context.metadata_decryptor(); - let aad = crypto_context.create_column_index_aad()?; - let plaintext = column_decryptor.decrypt(bytes, &aad)?; - decode_column_index(&plaintext, column.column_type()) - } - None => decode_column_index(bytes, column.column_type()), - } - } - - #[cfg(not(feature = "encryption"))] - fn parse_single_column_index( - bytes: &[u8], - _metadata: &ParquetMetaData, - column: &ColumnChunkMetaData, - _row_group_index: usize, - _col_index: usize, - ) -> Result { - decode_column_index(bytes, column.column_type()) - } - - fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { - let metadata = self.metadata.as_mut().unwrap(); - if self.offset_index != PageIndexPolicy::Skip { - let row_groups = metadata.row_groups(); - let mut all_indexes = Vec::with_capacity(row_groups.len()); - for (rg_idx, x) in row_groups.iter().enumerate() { - let mut row_group_indexes = Vec::with_capacity(x.columns().len()); - for (col_idx, c) in x.columns().iter().enumerate() { - let result = match c.offset_index_range() { - Some(r) => { - let r_start = usize::try_from(r.start - start_offset)?; - let r_end = usize::try_from(r.end - start_offset)?; - Self::parse_single_offset_index( - &bytes[r_start..r_end], - metadata, - c, - rg_idx, - col_idx, - ) - } - None => Err(general_err!("missing offset index")), - }; - - match result { - Ok(index) => row_group_indexes.push(index), - Err(e) => { - if self.offset_index == PageIndexPolicy::Required { - return Err(e); - } else { - // Invalidate and return - metadata.set_column_index(None); - metadata.set_offset_index(None); - return Ok(()); - } - } - } - } - all_indexes.push(row_group_indexes); - } - metadata.set_offset_index(Some(all_indexes)); - } - Ok(()) - } - - #[cfg(feature = "encryption")] - fn parse_single_offset_index( - bytes: &[u8], - metadata: &ParquetMetaData, - column: &ColumnChunkMetaData, - row_group_index: usize, - col_index: usize, - ) -> Result { - match &column.column_crypto_metadata { - Some(crypto_metadata) => { - let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| { - general_err!("Cannot decrypt offset index, no file decryptor set") - })?; - let crypto_context = CryptoContext::for_column( - file_decryptor, - crypto_metadata, - row_group_index, - col_index, - )?; - let column_decryptor = crypto_context.metadata_decryptor(); - let aad = crypto_context.create_offset_index_aad()?; - let plaintext = column_decryptor.decrypt(bytes, &aad)?; - decode_offset_index(&plaintext) - } - None => decode_offset_index(bytes), - } - } - - #[cfg(not(feature = "encryption"))] - fn parse_single_offset_index( - bytes: &[u8], - _metadata: &ParquetMetaData, - _column: &ColumnChunkMetaData, - _row_group_index: usize, - _col_index: usize, - ) -> Result { - decode_offset_index(bytes) - } - fn range_for_page_index(&self) -> Option> { // sanity check let metadata = self.metadata.as_ref()?; From db9ecc440fa444f690b2e1ff1db2a99a3de03994 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 17:05:31 -0400 Subject: [PATCH 08/11] fixups --- parquet/src/file/metadata/reader.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 639b69232e78..02aee18ec484 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -590,8 +590,10 @@ impl ParquetMetaDataReader { // Sanity check assert_eq!(bytes.len() as u64, range.end - range.start); - self.parse_column_index(&bytes, range.start)?; - self.parse_offset_index(&bytes, range.start)?; + if let Some(metadata) = self.metadata.as_mut() { + parse_column_index(metadata, self.column_index, &bytes, range.start)?; + parse_offset_index(metadata, self.offset_index, &bytes, range.start)?; + } Ok(()) } @@ -1130,6 +1132,7 @@ mod async_tests { use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; use tempfile::NamedTempFile; use crate::arrow::ArrowWriter; From c411e3e017ad1f309710376cf9f8b659072fb05c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 17:21:46 -0400 Subject: [PATCH 09/11] Add metadata parsing --- parquet/src/file/metadata/push_decoder.rs | 46 ++++++++++++++--------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 235b5ab78eca..b8e89e660c35 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -17,12 +17,12 @@ use crate::basic::ColumnOrder; use crate::errors::ParquetError; -use crate::file::metadata::parser::decode_metadata; +use crate::file::metadata::parser::{decode_metadata, parse_column_index, parse_offset_index}; use crate::file::metadata::{ FileMetaData, FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, }; -use crate::file::page_index::index_reader::acc_range; +use crate::file::page_index::index_reader::{acc_range, decode_column_index}; use crate::file::reader::ChunkReader; use crate::file::FOOTER_SIZE; use crate::schema::types; @@ -30,6 +30,7 @@ use crate::schema::types::SchemaDescriptor; use crate::thrift::TCompactSliceInputProtocol; use crate::thrift::TSerializable; use crate::DecodeResult; +use bytes::Bytes; use std::ops::Range; use std::sync::Arc; @@ -307,10 +308,9 @@ impl ParquetMetaDataPushDecoder { DecodeState::ReadingFooter => { // need to have the last 8 bytes of the file to decode the metadata let footer_start = file_len.saturating_sub(footer_len); - - ensure_range!(&self.buffers, footer_start..file_len); - // decode footer (we just checked we have the bytes) - let footer_bytes = self.buffers.get_bytes(footer_start, FOOTER_SIZE)?; + let footer_range = footer_start..file_len; + ensure_range!(&self.buffers, footer_range); + let footer_bytes = self.get_bytes(&footer_range)?; let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?; #[cfg(not(feature = "encryption"))] @@ -326,33 +326,36 @@ impl ParquetMetaDataPushDecoder { DecodeState::ReadingMetadata(footer_tail) => { let metadata_len: u64 = footer_tail.metadata_length() as u64; - let metadata_start = self.buffers.file_len() - footer_len - metadata_len; + let metadata_start = file_len - footer_len - metadata_len; + let metadata_end = metadata_start + metadata_len; + let metadata_range = metadata_start..metadata_end; - ensure_range!(&self.buffers, metadata_start..metadata_len); + ensure_range!(&self.buffers, metadata_range); - let metadata_bytes = self - .buffers - .get_bytes(metadata_start, footer_tail.metadata_length())?; + let metadata_bytes = self.get_bytes(&metadata_range)?; let metadata = decode_metadata(&metadata_bytes)?; self.state = DecodeState::ReadingPageIndex(metadata); continue; } - DecodeState::ReadingPageIndex(metadata) => { + DecodeState::ReadingPageIndex(mut metadata) => { let range = range_for_page_index( &metadata, self.column_index_policy, self.offset_index_policy, ); - let Some(range) = range else { + let Some(page_index_range) = range else { // no ranges means no page indexes are needed self.state = DecodeState::Finished; return Ok(DecodeResult::Data(metadata)); }; - ensure_range!(&self.buffers, range); - return Err(general_err!( - "ParquetMetaDataPushDecoder: page index reading not yet implemented" - )); + ensure_range!(&self.buffers, page_index_range); + let buffer = self.get_bytes(&page_index_range)?; + let offset = page_index_range.start; + parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?; + parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?; + self.state = DecodeState::Finished; + return Ok(DecodeResult::Data(metadata)); } DecodeState::Finished => return Ok(DecodeResult::Finished), @@ -364,6 +367,15 @@ impl ParquetMetaDataPushDecoder { } } } + + fn get_bytes(&self, range: &Range) -> Result { + let start = range.start; + let raw_len = range.end - range.start; + let len: usize = raw_len.try_into().map_err(|_| { + ParquetError::General(format!("Range length too large to fit in usize: {raw_len}",)) + })?; + self.buffers.get_bytes(start, len) + } } /// Decoding state machine From a768eafcbfa459ac961fea79a929fd11f4be9670 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 17:26:03 -0400 Subject: [PATCH 10/11] tests passing --- parquet/src/file/metadata/push_decoder.rs | 37 +++++++++++++++-------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index b8e89e660c35..d95c653670e5 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -217,16 +217,6 @@ pub struct ParquetMetaDataPushDecoder { buffers: crate::util::push_buffers::PushBuffers, } -// Macro that checks if the ranges are in buffer, and if not returns NeedsData -macro_rules! ensure_range { - ($buffers:expr, $range:expr) => { - if !$buffers.has_range(&$range) { - #[expect(clippy::single_range_in_vec_init)] - return Ok(DecodeResult::NeedsData(vec![$range])); - } - }; -} - impl ParquetMetaDataPushDecoder { /// Create a new `ParquetMetaDataPushDecoder` with the given file length. /// @@ -309,12 +299,17 @@ impl ParquetMetaDataPushDecoder { // need to have the last 8 bytes of the file to decode the metadata let footer_start = file_len.saturating_sub(footer_len); let footer_range = footer_start..file_len; - ensure_range!(&self.buffers, footer_range); + + if !self.buffers.has_range(&footer_range) { + self.state = DecodeState::ReadingFooter; + return Ok(needs_range(footer_range)); + } let footer_bytes = self.get_bytes(&footer_range)?; let footer_tail = FooterTail::try_from(footer_bytes.as_ref())?; #[cfg(not(feature = "encryption"))] if footer_tail.is_encrypted_footer() { + self.state = DecodeState::Finished; return Err(general_err!( "Parquet file has an encrypted footer but the encryption feature is disabled" )); @@ -330,7 +325,10 @@ impl ParquetMetaDataPushDecoder { let metadata_end = metadata_start + metadata_len; let metadata_range = metadata_start..metadata_end; - ensure_range!(&self.buffers, metadata_range); + if !self.buffers.has_range(&metadata_range) { + self.state = DecodeState::ReadingMetadata(footer_tail); + return Ok(needs_range(metadata_range)); + } let metadata_bytes = self.get_bytes(&metadata_range)?; let metadata = decode_metadata(&metadata_bytes)?; @@ -344,12 +342,18 @@ impl ParquetMetaDataPushDecoder { self.column_index_policy, self.offset_index_policy, ); + let Some(page_index_range) = range else { // no ranges means no page indexes are needed self.state = DecodeState::Finished; return Ok(DecodeResult::Data(metadata)); }; - ensure_range!(&self.buffers, page_index_range); + + if !self.buffers.has_range(&page_index_range) { + self.state = DecodeState::ReadingPageIndex(metadata); + return Ok(needs_range(page_index_range)); + } + let buffer = self.get_bytes(&page_index_range)?; let offset = page_index_range.start; parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?; @@ -368,6 +372,7 @@ impl ParquetMetaDataPushDecoder { } } + /// Returns the bytes for the given range from the internal buffer fn get_bytes(&self, range: &Range) -> Result { let start = range.start; let raw_len = range.end - range.start; @@ -378,6 +383,12 @@ impl ParquetMetaDataPushDecoder { } } +/// returns a DecodeResults that describes needing the given range +fn needs_range(range: Range) -> DecodeResult { + #[expect(clippy::single_range_in_vec_init)] + DecodeResult::NeedsData(vec![range]) +} + /// Decoding state machine #[derive(Debug)] enum DecodeState { From e0de5372a86a92ae1f638a9cf48e6aea198c7233 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 12 Sep 2025 17:43:58 -0400 Subject: [PATCH 11/11] clippy --- parquet/src/file/metadata/parser.rs | 146 +++++++++++++++++++ parquet/src/file/metadata/push_decoder.rs | 22 +-- parquet/src/file/metadata/reader.rs | 168 ++-------------------- 3 files changed, 160 insertions(+), 176 deletions(-) diff --git a/parquet/src/file/metadata/parser.rs b/parquet/src/file/metadata/parser.rs index 89d7073809fa..4220508f249f 100644 --- a/parquet/src/file/metadata/parser.rs +++ b/parquet/src/file/metadata/parser.rs @@ -35,6 +35,14 @@ use crate::thrift::TSerializable; use bytes::Bytes; use std::sync::Arc; +#[cfg(feature = "encryption")] +use crate::encryption::{ + decrypt::{FileDecryptionProperties, FileDecryptor}, + modules::create_footer_aad, +}; +#[cfg(feature = "encryption")] +use crate::format::EncryptionAlgorithm; + /// Decodes [`ParquetMetaData`] from the provided bytes. /// /// Typically this is used to decode the metadata from the end of a parquet @@ -262,6 +270,144 @@ fn parse_single_offset_index( decode_offset_index(bytes) } +/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted. +/// +/// Typically this is used to decode the metadata from the end of a parquet +/// file. The format of `buf` is the Thrift compact binary protocol, as specified +/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR +/// ciphers as specfied in the [Parquet Encryption Spec]. +/// +/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata +/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/ +#[cfg(feature = "encryption")] +pub(crate) fn decode_metadata_with_encryption( + buf: &[u8], + encrypted_footer: bool, + file_decryption_properties: Option<&FileDecryptionProperties>, +) -> crate::errors::Result { + let mut prot = TCompactSliceInputProtocol::new(buf); + let mut file_decryptor = None; + let decrypted_fmd_buf; + + if encrypted_footer { + if let Some(file_decryption_properties) = file_decryption_properties { + let t_file_crypto_metadata: crate::format::FileCryptoMetaData = + crate::format::FileCryptoMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; + let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm { + EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix, + _ => Some(false), + } + .unwrap_or(false); + if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() { + return Err(general_err!( + "Parquet file was encrypted with an AAD prefix that is not stored in the file, \ + but no AAD prefix was provided in the file decryption properties" + )); + } + let decryptor = get_file_decryptor( + t_file_crypto_metadata.encryption_algorithm, + t_file_crypto_metadata.key_metadata.as_deref(), + file_decryption_properties, + )?; + let footer_decryptor = decryptor.get_footer_decryptor(); + let aad_footer = create_footer_aad(decryptor.file_aad())?; + + decrypted_fmd_buf = footer_decryptor? + .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref()) + .map_err(|_| { + general_err!( + "Provided footer key and AAD were unable to decrypt parquet footer" + ) + })?; + prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); + + file_decryptor = Some(decryptor); + } else { + return Err(general_err!( + "Parquet file has an encrypted footer but decryption properties were not provided" + )); + } + } + + use crate::format::FileMetaData as TFileMetaData; + let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) + .map_err(|e| general_err!("Could not parse metadata: {}", e))?; + let schema = types::from_thrift(&t_file_metadata.schema)?; + let schema_descr = Arc::new(SchemaDescriptor::new(schema)); + + if let (Some(algo), Some(file_decryption_properties)) = ( + t_file_metadata.encryption_algorithm, + file_decryption_properties, + ) { + // File has a plaintext footer but encryption algorithm is set + let file_decryptor_value = get_file_decryptor( + algo, + t_file_metadata.footer_signing_key_metadata.as_deref(), + file_decryption_properties, + )?; + if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer { + file_decryptor_value.verify_plaintext_footer_signature(buf)?; + } + file_decryptor = Some(file_decryptor_value); + } + + let mut row_groups = Vec::new(); + for rg in t_file_metadata.row_groups { + let r = RowGroupMetaData::from_encrypted_thrift( + schema_descr.clone(), + rg, + file_decryptor.as_ref(), + )?; + row_groups.push(r); + } + let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; + + let file_metadata = FileMetaData::new( + t_file_metadata.version, + t_file_metadata.num_rows, + t_file_metadata.created_by, + t_file_metadata.key_value_metadata, + schema_descr, + column_orders, + ); + let mut metadata = ParquetMetaData::new(file_metadata, row_groups); + + metadata.with_file_decryptor(file_decryptor); + + Ok(metadata) +} + +#[cfg(feature = "encryption")] +fn get_file_decryptor( + encryption_algorithm: EncryptionAlgorithm, + footer_key_metadata: Option<&[u8]>, + file_decryption_properties: &FileDecryptionProperties, +) -> crate::errors::Result { + match encryption_algorithm { + EncryptionAlgorithm::AESGCMV1(algo) => { + let aad_file_unique = algo + .aad_file_unique + .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?; + let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() { + aad_prefix.clone() + } else { + algo.aad_prefix.unwrap_or_default() + }; + + FileDecryptor::new( + file_decryption_properties, + footer_key_metadata, + aad_file_unique, + aad_prefix, + ) + } + EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!( + "The AES_GCM_CTR_V1 encryption algorithm is not yet supported" + )), + } +} + #[cfg(test)] mod test { use super::*; diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index d95c653670e5..d6fe16f9e3f5 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -15,24 +15,15 @@ // specific language governing permissions and limitations // under the License. -use crate::basic::ColumnOrder; use crate::errors::ParquetError; use crate::file::metadata::parser::{decode_metadata, parse_column_index, parse_offset_index}; -use crate::file::metadata::{ - FileMetaData, FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, - RowGroupMetaData, -}; -use crate::file::page_index::index_reader::{acc_range, decode_column_index}; +use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData}; +use crate::file::page_index::index_reader::acc_range; use crate::file::reader::ChunkReader; use crate::file::FOOTER_SIZE; -use crate::schema::types; -use crate::schema::types::SchemaDescriptor; -use crate::thrift::TCompactSliceInputProtocol; -use crate::thrift::TSerializable; use crate::DecodeResult; use bytes::Bytes; use std::ops::Range; -use std::sync::Arc; /// A push decoder for [`ParquetMetaData`]. /// @@ -332,7 +323,7 @@ impl ParquetMetaDataPushDecoder { let metadata_bytes = self.get_bytes(&metadata_range)?; let metadata = decode_metadata(&metadata_bytes)?; - self.state = DecodeState::ReadingPageIndex(metadata); + self.state = DecodeState::ReadingPageIndex(Box::new(metadata)); continue; } @@ -346,7 +337,7 @@ impl ParquetMetaDataPushDecoder { let Some(page_index_range) = range else { // no ranges means no page indexes are needed self.state = DecodeState::Finished; - return Ok(DecodeResult::Data(metadata)); + return Ok(DecodeResult::Data(*metadata)); }; if !self.buffers.has_range(&page_index_range) { @@ -359,7 +350,7 @@ impl ParquetMetaDataPushDecoder { parse_column_index(&mut metadata, self.column_index_policy, &buffer, offset)?; parse_offset_index(&mut metadata, self.offset_index_policy, &buffer, offset)?; self.state = DecodeState::Finished; - return Ok(DecodeResult::Data(metadata)); + return Ok(DecodeResult::Data(*metadata)); } DecodeState::Finished => return Ok(DecodeResult::Finished), @@ -385,7 +376,6 @@ impl ParquetMetaDataPushDecoder { /// returns a DecodeResults that describes needing the given range fn needs_range(range: Range) -> DecodeResult { - #[expect(clippy::single_range_in_vec_init)] DecodeResult::NeedsData(vec![range]) } @@ -397,7 +387,7 @@ enum DecodeState { /// Reading the metadata thrift structure ReadingMetadata(FooterTail), // - ReadingPageIndex(ParquetMetaData), + ReadingPageIndex(Box), // todo read boom filters? Finished, /// State left during decoding. This should never be observed outside diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 02aee18ec484..6b456b8f72ef 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -15,32 +15,22 @@ // specific language governing permissions and limitations // under the License. +#[cfg(feature = "async")] +use bytes::Bytes; use std::{io::Read, ops::Range}; #[cfg(feature = "encryption")] -use crate::encryption::{ - decrypt::{FileDecryptionProperties, FileDecryptor}, - modules::create_footer_aad, -}; -use bytes::Bytes; +use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataPushDecoder}; -use crate::file::page_index::index::Index; -use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::metadata::ParquetMetaData; use crate::file::reader::ChunkReader; use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; -#[cfg(feature = "encryption")] -use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData}; -use crate::thrift::TSerializable; #[cfg(all(feature = "async", feature = "arrow"))] use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch}; -#[cfg(feature = "encryption")] -use crate::encryption::decrypt::CryptoContext; -use crate::file::metadata::parser::{parse_column_index, parse_offset_index}; +use crate::file::metadata::parser::{decode_metadata, parse_column_index, parse_offset_index}; use crate::file::metadata::push_decoder::range_for_page_index; -use crate::file::page_index::offset_index::OffsetIndexMetaData; /// Reads the [`ParquetMetaData`] from a byte stream. /// @@ -600,7 +590,6 @@ impl ParquetMetaDataReader { fn range_for_page_index(&self) -> Option> { // sanity check - let metadata = self.metadata.as_ref()?; range_for_page_index( self.metadata.as_ref()?, self.column_index, @@ -795,7 +784,7 @@ impl ParquetMetaDataReader { footer_tail: &FooterTail, ) -> Result { #[cfg(feature = "encryption")] - let result = Self::decode_metadata_with_encryption( + let result = crate::file::metadata::parser::decode_metadata_with_encryption( buf, footer_tail.is_encrypted_footer(), self.file_decryption_properties.as_ref(), @@ -813,112 +802,6 @@ impl ParquetMetaDataReader { result } - /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted. - /// - /// Typically this is used to decode the metadata from the end of a parquet - /// file. The format of `buf` is the Thrift compact binary protocol, as specified - /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR - /// ciphers as specfied in the [Parquet Encryption Spec]. - /// - /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata - /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/ - #[cfg(feature = "encryption")] - fn decode_metadata_with_encryption( - buf: &[u8], - encrypted_footer: bool, - file_decryption_properties: Option<&FileDecryptionProperties>, - ) -> Result { - let mut prot = TCompactSliceInputProtocol::new(buf); - let mut file_decryptor = None; - let decrypted_fmd_buf; - - if encrypted_footer { - if let Some(file_decryption_properties) = file_decryption_properties { - let t_file_crypto_metadata: TFileCryptoMetaData = - TFileCryptoMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?; - let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm { - EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix, - _ => Some(false), - } - .unwrap_or(false); - if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() { - return Err(general_err!( - "Parquet file was encrypted with an AAD prefix that is not stored in the file, \ - but no AAD prefix was provided in the file decryption properties" - )); - } - let decryptor = get_file_decryptor( - t_file_crypto_metadata.encryption_algorithm, - t_file_crypto_metadata.key_metadata.as_deref(), - file_decryption_properties, - )?; - let footer_decryptor = decryptor.get_footer_decryptor(); - let aad_footer = create_footer_aad(decryptor.file_aad())?; - - decrypted_fmd_buf = footer_decryptor? - .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref()) - .map_err(|_| { - general_err!( - "Provided footer key and AAD were unable to decrypt parquet footer" - ) - })?; - prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref()); - - file_decryptor = Some(decryptor); - } else { - return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided")); - } - } - - let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot) - .map_err(|e| general_err!("Could not parse metadata: {}", e))?; - let schema = types::from_thrift(&t_file_metadata.schema)?; - let schema_descr = Arc::new(SchemaDescriptor::new(schema)); - - if let (Some(algo), Some(file_decryption_properties)) = ( - t_file_metadata.encryption_algorithm, - file_decryption_properties, - ) { - // File has a plaintext footer but encryption algorithm is set - let file_decryptor_value = get_file_decryptor( - algo, - t_file_metadata.footer_signing_key_metadata.as_deref(), - file_decryption_properties, - )?; - if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer { - file_decryptor_value.verify_plaintext_footer_signature(buf)?; - } - file_decryptor = Some(file_decryptor_value); - } - - let mut row_groups = Vec::new(); - for rg in t_file_metadata.row_groups { - let r = RowGroupMetaData::from_encrypted_thrift( - schema_descr.clone(), - rg, - file_decryptor.as_ref(), - )?; - row_groups.push(r); - } - let column_orders = - Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?; - - let file_metadata = FileMetaData::new( - t_file_metadata.version, - t_file_metadata.num_rows, - t_file_metadata.created_by, - t_file_metadata.key_value_metadata, - schema_descr, - column_orders, - ); - let mut metadata = ParquetMetaData::new(file_metadata, row_groups); - - metadata.with_file_decryptor(file_decryptor); - - Ok(metadata) - } - /// Decodes [`ParquetMetaData`] from the provided bytes. /// /// Typically this is used to decode the metadata from the end of a parquet @@ -927,51 +810,16 @@ impl ParquetMetaDataReader { /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata(buf: &[u8]) -> Result { - Self::decode_metadata(buf) - } -} - -#[cfg(feature = "encryption")] -fn get_file_decryptor( - encryption_algorithm: EncryptionAlgorithm, - footer_key_metadata: Option<&[u8]>, - file_decryption_properties: &FileDecryptionProperties, -) -> Result { - match encryption_algorithm { - EncryptionAlgorithm::AESGCMV1(algo) => { - let aad_file_unique = algo - .aad_file_unique - .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?; - let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() { - aad_prefix.clone() - } else { - algo.aad_prefix.unwrap_or_default() - }; - - FileDecryptor::new( - file_decryption_properties, - footer_key_metadata, - aad_file_unique, - aad_prefix, - ) - } - EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!( - "The AES_GCM_CTR_V1 encryption algorithm is not yet supported" - )), + decode_metadata(buf) } } #[cfg(test)] mod tests { use super::*; - use bytes::Bytes; - - use crate::basic::SortOrder; - use crate::basic::Type; use crate::file::reader::Length; - use crate::format::TypeDefinedOrder; - use crate::schema::types::Type as SchemaType; use crate::util::test_common::file_util::get_test_file; + use bytes::Bytes; #[test] fn test_parse_metadata_size_smaller_than_footer() {