Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
e3a0b50
custom PageLocation decoder for speed
etseidl Aug 20, 2025
71d3859
fix recently added test
etseidl Aug 20, 2025
ff42e5a
clippy
etseidl Aug 20, 2025
1f2c216
experimental new form for column index
etseidl Aug 20, 2025
37f3b20
fix for test added in main
etseidl Aug 21, 2025
3d4e28e
refactor new column index
etseidl Aug 21, 2025
2b85b89
checkpoint...everything but stats converter
etseidl Aug 21, 2025
5ee1b8f
fix bug found in testing
etseidl Aug 21, 2025
624b88b
Merge branch 'new_col_idx' into new_col_idx_full
etseidl Aug 21, 2025
d99a06a
stats converter works
etseidl Aug 22, 2025
79a6917
get rid of import
etseidl Aug 22, 2025
878d460
get parquet-index working
etseidl Aug 22, 2025
009632a
doc fixes
etseidl Aug 22, 2025
998ac6c
Merge branch 'offset_idx_speedup' into new_col_idx_full
etseidl Aug 22, 2025
a822dfd
move column index to its own module
etseidl Aug 22, 2025
20df075
add ColumnIndexIterators trait, simplify stats converter a little
etseidl Aug 22, 2025
7755b7b
restore comment
etseidl Aug 22, 2025
66ed8bc
Merge branch 'new_col_idx' into new_col_idx_full
etseidl Aug 22, 2025
f6c5738
further rework...allow for fallback to slow decoder
etseidl Aug 24, 2025
3733b86
Merge branch 'offset_idx_speedup' into new_col_idx_full
etseidl Aug 24, 2025
09d71e1
refactor a bit
etseidl Aug 24, 2025
1ddaa35
simplify reading of int array
etseidl Aug 24, 2025
006d59d
Merge branch 'offset_idx_speedup' into new_col_idx_full
etseidl Aug 24, 2025
c271085
get write working for enum and some unions
etseidl Aug 25, 2025
34cdaf2
make test_roundtrip visible
etseidl Aug 25, 2025
c9be570
add test for converted_type, start on logical_type
etseidl Aug 25, 2025
a9cd09d
checkpoint struct field writing
etseidl Aug 25, 2025
ae65167
get some struct examples and lists working
etseidl Aug 25, 2025
272a013
get rid of copied allow
etseidl Aug 25, 2025
632e171
get writer macros for structs working
etseidl Aug 26, 2025
9f01b60
fix bug in struct macro
etseidl Aug 26, 2025
2511f8f
make Repetition public
etseidl Aug 26, 2025
61e9e07
get union working for writes
etseidl Aug 26, 2025
e39f119
add some tests
etseidl Aug 26, 2025
def3d07
redo OrderedF64 initialization
etseidl Aug 26, 2025
386f222
unused import
etseidl Aug 26, 2025
7ae2304
Merge branch 'gh5854_thrift_remodel' into write_thrift
etseidl Aug 26, 2025
6beb79d
get decryption working
etseidl Aug 26, 2025
1eaa17b
refactor and clippy fixes
etseidl Aug 26, 2025
713e38a
add page header defs
etseidl Aug 26, 2025
79e8f85
totally rework the input side
etseidl Aug 27, 2025
b31c9e6
rework struct field reading
etseidl Aug 27, 2025
8c4e49d
fix skipping bool fields
etseidl Aug 27, 2025
e0e1852
remove cruft
etseidl Aug 27, 2025
1ebfdf2
Merge branch 'gh5854_thrift_remodel' into write_thrift
etseidl Aug 27, 2025
366326a
Merge branch 'write_thrift' into read_and_crypto
etseidl Aug 27, 2025
7b8777a
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Aug 27, 2025
d8081a9
fix clippy issues
etseidl Aug 28, 2025
5d6c8b1
allow unused page header structs
etseidl Aug 28, 2025
709e813
remove Write from WriteThrift
etseidl Aug 29, 2025
def1d68
Merge branch 'write_thrift' into read_and_crypto
etseidl Aug 29, 2025
0579456
finish merge
etseidl Aug 29, 2025
c1587c4
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Aug 29, 2025
689297c
Merge branch 'gh5854_thrift_remodel' into write_thrift
etseidl Aug 30, 2025
7d47857
Merge branch 'write_thrift' into read_and_crypto
etseidl Aug 30, 2025
b543838
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Aug 30, 2025
138b0d5
Merge branch 'gh5854_thrift_remodel' into write_thrift
etseidl Sep 5, 2025
5b6c177
Merge branch 'write_thrift' into read_and_crypto
etseidl Sep 5, 2025
88959be
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Sep 5, 2025
c729d22
Merge remote-tracking branch 'origin/gh5854_thrift_remodel' into writ…
etseidl Sep 8, 2025
96419c4
Merge branch 'write_thrift' into read_and_crypto
etseidl Sep 8, 2025
6ec102f
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Sep 8, 2025
f81a732
get a start on some documentation and add some TODOs
etseidl Sep 10, 2025
be58ea6
Merge branch 'write_thrift' into read_and_crypto
etseidl Sep 10, 2025
02e5e16
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Sep 10, 2025
7268dd3
fix docs
etseidl Sep 10, 2025
8305915
Merge branch 'write_thrift' into read_and_crypto
etseidl Sep 10, 2025
4221646
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Sep 10, 2025
f0beb0b
Merge branch 'gh5854_thrift_remodel' into read_and_crypto
etseidl Sep 10, 2025
b303e52
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Sep 10, 2025
cfa6740
backport fix for tests without encryption
etseidl Sep 10, 2025
6c82028
Merge branch 'read_and_crypto' into rework_thrift_reader
etseidl Sep 10, 2025
82f31a4
add documentation
etseidl Sep 11, 2025
4da5d9e
Merge branch 'gh5854_thrift_remodel' into rework_thrift_reader
etseidl Sep 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 21 additions & 31 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use std::{fmt, str};

pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
use crate::parquet_thrift::{
ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift,
WriteThriftField,
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
WriteThrift, WriteThriftField,
};
use crate::{thrift_enum, thrift_struct, thrift_union_all_empty};

Expand Down Expand Up @@ -165,9 +165,8 @@ pub enum ConvertedType {
INTERVAL,
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ConvertedType {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ConvertedType {
fn read_thrift(prot: &mut R) -> Result<Self> {
let val = prot.read_i32()?;
Ok(match val {
0 => Self::UTF8,
Expand Down Expand Up @@ -361,12 +360,9 @@ pub enum LogicalType {
},
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
prot.read_struct_begin()?;

let field_ident = prot.read_field_begin()?;
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for LogicalType {
fn read_thrift(prot: &mut R) -> Result<Self> {
let field_ident = prot.read_field_begin(0)?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("received empty union from remote LogicalType"));
}
Expand All @@ -388,7 +384,7 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType {
Self::Enum
}
5 => {
let val = DecimalType::try_from(&mut *prot)?;
let val = DecimalType::read_thrift(&mut *prot)?;
Self::Decimal {
scale: val.scale,
precision: val.precision,
Expand All @@ -399,21 +395,21 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType {
Self::Date
}
7 => {
let val = TimeType::try_from(&mut *prot)?;
let val = TimeType::read_thrift(&mut *prot)?;
Self::Time {
is_adjusted_to_u_t_c: val.is_adjusted_to_u_t_c,
unit: val.unit,
}
}
8 => {
let val = TimestampType::try_from(&mut *prot)?;
let val = TimestampType::read_thrift(&mut *prot)?;
Self::Timestamp {
is_adjusted_to_u_t_c: val.is_adjusted_to_u_t_c,
unit: val.unit,
}
}
10 => {
let val = IntType::try_from(&mut *prot)?;
let val = IntType::read_thrift(&mut *prot)?;
Self::Integer {
is_signed: val.is_signed,
bit_width: val.bit_width,
Expand All @@ -440,19 +436,19 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType {
Self::Float16
}
16 => {
let val = VariantType::try_from(&mut *prot)?;
let val = VariantType::read_thrift(&mut *prot)?;
Self::Variant {
specification_version: val.specification_version,
}
}
17 => {
let val = GeometryType::try_from(&mut *prot)?;
let val = GeometryType::read_thrift(&mut *prot)?;
Self::Geometry {
crs: val.crs.map(|s| s.to_owned()),
}
}
18 => {
let val = GeographyType::try_from(&mut *prot)?;
let val = GeographyType::read_thrift(&mut *prot)?;
Self::Geography {
crs: val.crs.map(|s| s.to_owned()),
algorithm: val.algorithm,
Expand All @@ -465,13 +461,12 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for LogicalType {
}
}
};
let field_ident = prot.read_field_begin()?;
let field_ident = prot.read_field_begin(field_ident.id)?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
"Received multiple fields for union from remote LogicalType"
));
}
prot.read_struct_end()?;
Ok(ret)
}
}
Expand Down Expand Up @@ -756,9 +751,8 @@ pub enum Compression {
LZ4_RAW,
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for Compression {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for Compression {
fn read_thrift(prot: &mut R) -> Result<Self> {
let val = prot.read_i32()?;
Ok(match val {
0 => Self::UNCOMPRESSED,
Expand Down Expand Up @@ -1124,12 +1118,9 @@ impl ColumnOrder {
}
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnOrder {
type Error = ParquetError;

fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
prot.read_struct_begin()?;
let field_ident = prot.read_field_begin()?;
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ColumnOrder {
fn read_thrift(prot: &mut R) -> Result<Self> {
let field_ident = prot.read_field_begin(0)?;
if field_ident.field_type == FieldType::Stop {
return Err(general_err!("Received empty union from remote ColumnOrder"));
}
Expand All @@ -1144,13 +1135,12 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnOrder {
Self::UNKNOWN
}
};
let field_ident = prot.read_field_begin()?;
let field_ident = prot.read_field_begin(field_ident.id)?;
if field_ident.field_type != FieldType::Stop {
return Err(general_err!(
"Received multiple fields for union from remote ColumnOrder"
));
}
prot.read_struct_end()?;
Ok(ret)
}
}
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/column_crypto_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::format::{
EncryptionWithFooterKey as TEncryptionWithFooterKey,
};
use crate::parquet_thrift::{
ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift,
WriteThriftField,
read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
};
use crate::{thrift_struct, thrift_union};

Expand Down
4 changes: 2 additions & 2 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ use crate::{
use crate::{
basic::{ColumnOrder, Compression, Encoding, Type},
parquet_thrift::{
ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
WriteThrift, WriteThriftField,
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
},
};
use crate::{
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{io::Read, ops::Range};

#[cfg(feature = "encryption")]
use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties};
use crate::parquet_thrift::ThriftCompactInputProtocol;
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
use bytes::Bytes;

use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -962,8 +962,8 @@ impl ParquetMetaDataReader {
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
let mut prot = ThriftCompactInputProtocol::new(buf);
ParquetMetaData::try_from(&mut prot)
let mut prot = ThriftSliceInputProtocol::new(buf);
ParquetMetaData::read_thrift(&mut prot)
}
}

Expand Down
24 changes: 12 additions & 12 deletions parquet/src/file/metadata/thrift_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::{
statistics::ValueStatistics,
},
parquet_thrift::{
ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
WriteThrift, WriteThriftField,
read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
},
schema::types::{parquet_schema_from_array, ColumnDescriptor, SchemaDescriptor},
thrift_struct, thrift_union,
Expand All @@ -46,6 +46,7 @@ use crate::{
use crate::{
encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
file::column_crypto_metadata::ColumnCryptoMetaData,
parquet_thrift::ThriftSliceInputProtocol,
schema::types::SchemaDescPtr,
};

Expand Down Expand Up @@ -669,8 +670,8 @@ fn row_group_from_encrypted_thrift(
)
})?;

let mut prot = ThriftCompactInputProtocol::new(decrypted_cc_buf.as_slice());
let col_meta = ColumnMetaData::try_from(&mut prot)?;
let mut prot = ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice());
let col_meta = ColumnMetaData::read_thrift(&mut prot)?;
c.meta_data = Some(col_meta);
columns.push(convert_column(c, d.clone())?);
} else {
Expand Down Expand Up @@ -699,14 +700,14 @@ pub(crate) fn parquet_metadata_with_encryption(
encrypted_footer: bool,
buf: &[u8],
) -> Result<ParquetMetaData> {
let mut prot = ThriftCompactInputProtocol::new(buf);
let mut prot = ThriftSliceInputProtocol::new(buf);
let mut file_decryptor = None;
let decrypted_fmd_buf;

if encrypted_footer {
if let Some(file_decryption_properties) = file_decryption_properties {
let t_file_crypto_metadata: FileCryptoMetaData =
FileCryptoMetaData::try_from(&mut prot)
FileCryptoMetaData::read_thrift(&mut prot)
.map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
Expand Down Expand Up @@ -734,7 +735,7 @@ pub(crate) fn parquet_metadata_with_encryption(
"Provided footer key and AAD were unable to decrypt parquet footer"
)
})?;
prot = ThriftCompactInputProtocol::new(decrypted_fmd_buf.as_ref());
prot = ThriftSliceInputProtocol::new(decrypted_fmd_buf.as_ref());

file_decryptor = Some(decryptor);
} else {
Expand All @@ -744,7 +745,7 @@ pub(crate) fn parquet_metadata_with_encryption(
}
}

let file_meta = super::thrift_gen::FileMetaData::try_from(&mut prot)
let file_meta = super::thrift_gen::FileMetaData::read_thrift(&mut prot)
.map_err(|e| general_err!("Could not parse metadata: {}", e))?;

let version = file_meta.version;
Expand Down Expand Up @@ -852,10 +853,9 @@ pub(super) fn get_file_decryptor(

/// Create ParquetMetaData from thrift input. Note that this only decodes the file metadata in
/// the Parquet footer. Page indexes will need to be added later.
impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ParquetMetaData {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
let file_meta = super::thrift_gen::FileMetaData::try_from(prot)?;
impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for ParquetMetaData {
fn read_thrift(prot: &mut R) -> Result<Self> {
let file_meta = super::thrift_gen::FileMetaData::read_thrift(prot)?;

let version = file_meta.version;
let num_rows = file_meta.num_rows;
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/file/page_encoding_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
use std::io::Write;

use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::errors::Result;
use crate::parquet_thrift::{
ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift,
WriteThriftField,
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
WriteThrift, WriteThriftField,
};
use crate::thrift_struct;

Expand Down
14 changes: 7 additions & 7 deletions parquet/src/file/page_index/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::file::page_index::column_index::{
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::ChunkReader;
use crate::parquet_thrift::{
ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift,
WriteThriftField,
read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
};
use crate::thrift_struct;
use std::io::Write;
Expand Down Expand Up @@ -136,15 +136,15 @@ pub fn read_offset_indexes<R: ChunkReader>(
}

pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
let mut prot = ThriftCompactInputProtocol::new(data);
let mut prot = ThriftSliceInputProtocol::new(data);

// Try to read fast-path first. If that fails, fall back to slower but more robust
// decoder.
match OffsetIndexMetaData::try_from_fast(&mut prot) {
Ok(offset_index) => Ok(offset_index),
Err(_) => {
prot = ThriftCompactInputProtocol::new(data);
OffsetIndexMetaData::try_from(&mut prot)
prot = ThriftSliceInputProtocol::new(data);
OffsetIndexMetaData::read_thrift(&mut prot)
}
}
}
Expand All @@ -166,8 +166,8 @@ pub(crate) fn decode_column_index(
data: &[u8],
column_type: Type,
) -> Result<ColumnIndexMetaData, ParquetError> {
let mut prot = ThriftCompactInputProtocol::new(data);
let index = ThriftColumnIndex::try_from(&mut prot)?;
let mut prot = ThriftSliceInputProtocol::new(data);
let index = ThriftColumnIndex::read_thrift(&mut prot)?;

let index = match column_type {
Type::BOOLEAN => {
Expand Down
12 changes: 7 additions & 5 deletions parquet/src/file/page_index/offset_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
use std::io::Write;

use crate::parquet_thrift::{
ElementType, FieldType, ThriftCompactInputProtocol, ThriftCompactOutputProtocol, WriteThrift,
WriteThriftField,
read_thrift_vec, ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
};
use crate::{
errors::{ParquetError, Result},
Expand Down Expand Up @@ -113,7 +113,9 @@ impl OffsetIndexMetaData {
// Fast-path read of offset index. This works because we expect all field deltas to be 1,
// and there's no nesting beyond PageLocation, so no need to save the last field id. Like
// read_page_locations(), this will fail if absolute field id's are used.
pub(super) fn try_from_fast<'a>(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
pub(super) fn try_from_fast<'a, R: ThriftCompactInputProtocol<'a>>(
prot: &mut R,
) -> Result<Self> {
// Offset index is a struct with 2 fields. First field is an array of PageLocations,
// the second an optional array of i64.

Expand All @@ -140,7 +142,7 @@ impl OffsetIndexMetaData {
"encountered unknown field while reading OffsetIndex"
));
}
let vec = Vec::<i64>::try_from(&mut *prot)?;
let vec = read_thrift_vec::<i64, R>(&mut *prot)?;
unencoded_byte_array_data_bytes = Some(vec);

// this one should be Stop
Expand All @@ -164,7 +166,7 @@ impl OffsetIndexMetaData {

// Note: this will fail if the fields are either out of order, or if a suboptimal
// encoder doesn't use field deltas.
fn read_page_location<'a>(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<PageLocation> {
fn read_page_location<'a, R: ThriftCompactInputProtocol<'a>>(prot: &mut R) -> Result<PageLocation> {
// there are 3 fields, all mandatory, so all field deltas should be 1
let (field_type, delta) = prot.read_field_header()?;
if delta != 1 || field_type != FieldType::I64 as u8 {
Expand Down
Loading
Loading