diff --git a/parquet-variant-compute/src/arrow_to_variant.rs b/parquet-variant-compute/src/arrow_to_variant.rs new file mode 100644 index 000000000000..c08990de6911 --- /dev/null +++ b/parquet-variant-compute/src/arrow_to_variant.rs @@ -0,0 +1,1995 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use crate::type_conversion::{decimal_to_variant_decimal, CastOptions}; +use arrow::array::{ + Array, AsArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, +}; +use arrow::compute::kernels::cast; +use arrow::datatypes::{ + ArrowNativeType, ArrowPrimitiveType, ArrowTemporalType, ArrowTimestampType, Date32Type, + Date64Type, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, + RunEndIndexType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, +}; +use arrow::temporal_conversions::{as_date, as_datetime, as_time}; +use arrow_schema::{ArrowError, DataType, TimeUnit}; +use chrono::{DateTime, TimeZone, Utc}; +use parquet_variant::{ + ObjectFieldBuilder, Variant, VariantBuilderExt, VariantDecimal16, VariantDecimal4, + VariantDecimal8, +}; + +// ============================================================================ +// Row-oriented builders for efficient Arrow-to-Variant conversion +// ============================================================================ + +/// Row builder for converting Arrow arrays to VariantArray row by row +pub(crate) enum ArrowToVariantRowBuilder<'a> { + Null(NullArrowToVariantBuilder), + Boolean(BooleanArrowToVariantBuilder<'a>), + PrimitiveInt8(PrimitiveArrowToVariantBuilder<'a, Int8Type>), + PrimitiveInt16(PrimitiveArrowToVariantBuilder<'a, Int16Type>), + PrimitiveInt32(PrimitiveArrowToVariantBuilder<'a, Int32Type>), + PrimitiveInt64(PrimitiveArrowToVariantBuilder<'a, Int64Type>), + PrimitiveUInt8(PrimitiveArrowToVariantBuilder<'a, UInt8Type>), + PrimitiveUInt16(PrimitiveArrowToVariantBuilder<'a, UInt16Type>), + PrimitiveUInt32(PrimitiveArrowToVariantBuilder<'a, UInt32Type>), + PrimitiveUInt64(PrimitiveArrowToVariantBuilder<'a, UInt64Type>), + PrimitiveFloat16(PrimitiveArrowToVariantBuilder<'a, Float16Type>), + PrimitiveFloat32(PrimitiveArrowToVariantBuilder<'a, Float32Type>), + PrimitiveFloat64(PrimitiveArrowToVariantBuilder<'a, Float64Type>), + Decimal32(Decimal32ArrowToVariantBuilder<'a>), + Decimal64(Decimal64ArrowToVariantBuilder<'a>), + Decimal128(Decimal128ArrowToVariantBuilder<'a>), + Decimal256(Decimal256ArrowToVariantBuilder<'a>), + TimestampSecond(TimestampArrowToVariantBuilder<'a, TimestampSecondType>), + TimestampMillisecond(TimestampArrowToVariantBuilder<'a, TimestampMillisecondType>), + TimestampMicrosecond(TimestampArrowToVariantBuilder<'a, TimestampMicrosecondType>), + TimestampNanosecond(TimestampArrowToVariantBuilder<'a, TimestampNanosecondType>), + Date32(DateArrowToVariantBuilder<'a, Date32Type>), + Date64(DateArrowToVariantBuilder<'a, Date64Type>), + Time32Second(TimeArrowToVariantBuilder<'a, Time32SecondType>), + Time32Millisecond(TimeArrowToVariantBuilder<'a, Time32MillisecondType>), + Time64Microsecond(TimeArrowToVariantBuilder<'a, Time64MicrosecondType>), + Time64Nanosecond(TimeArrowToVariantBuilder<'a, Time64NanosecondType>), + Binary(BinaryArrowToVariantBuilder<'a, i32>), + LargeBinary(BinaryArrowToVariantBuilder<'a, i64>), + BinaryView(BinaryViewArrowToVariantBuilder<'a>), + FixedSizeBinary(FixedSizeBinaryArrowToVariantBuilder<'a>), + Utf8(StringArrowToVariantBuilder<'a, i32>), + LargeUtf8(StringArrowToVariantBuilder<'a, i64>), + Utf8View(StringViewArrowToVariantBuilder<'a>), + List(ListArrowToVariantBuilder<'a, i32>), + LargeList(ListArrowToVariantBuilder<'a, i64>), + Struct(StructArrowToVariantBuilder<'a>), + Map(MapArrowToVariantBuilder<'a>), + Union(UnionArrowToVariantBuilder<'a>), + Dictionary(DictionaryArrowToVariantBuilder<'a>), + RunEndEncodedInt16(RunEndEncodedArrowToVariantBuilder<'a, Int16Type>), + RunEndEncodedInt32(RunEndEncodedArrowToVariantBuilder<'a, Int32Type>), + RunEndEncodedInt64(RunEndEncodedArrowToVariantBuilder<'a, Int64Type>), +} + +impl<'a> ArrowToVariantRowBuilder<'a> { + /// Appends a single row at the given index to the supplied builder. + pub fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<(), ArrowError> { + use ArrowToVariantRowBuilder::*; + match self { + Null(b) => b.append_row(builder, index), + Boolean(b) => b.append_row(builder, index), + PrimitiveInt8(b) => b.append_row(builder, index), + PrimitiveInt16(b) => b.append_row(builder, index), + PrimitiveInt32(b) => b.append_row(builder, index), + PrimitiveInt64(b) => b.append_row(builder, index), + PrimitiveUInt8(b) => b.append_row(builder, index), + PrimitiveUInt16(b) => b.append_row(builder, index), + PrimitiveUInt32(b) => b.append_row(builder, index), + PrimitiveUInt64(b) => b.append_row(builder, index), + PrimitiveFloat16(b) => b.append_row(builder, index), + PrimitiveFloat32(b) => b.append_row(builder, index), + PrimitiveFloat64(b) => b.append_row(builder, index), + Decimal32(b) => b.append_row(builder, index), + Decimal64(b) => b.append_row(builder, index), + Decimal128(b) => b.append_row(builder, index), + Decimal256(b) => b.append_row(builder, index), + TimestampSecond(b) => b.append_row(builder, index), + TimestampMillisecond(b) => b.append_row(builder, index), + TimestampMicrosecond(b) => b.append_row(builder, index), + TimestampNanosecond(b) => b.append_row(builder, index), + Date32(b) => b.append_row(builder, index), + Date64(b) => b.append_row(builder, index), + Time32Second(b) => b.append_row(builder, index), + Time32Millisecond(b) => b.append_row(builder, index), + Time64Microsecond(b) => b.append_row(builder, index), + Time64Nanosecond(b) => b.append_row(builder, index), + Binary(b) => b.append_row(builder, index), + LargeBinary(b) => b.append_row(builder, index), + BinaryView(b) => b.append_row(builder, index), + FixedSizeBinary(b) => b.append_row(builder, index), + Utf8(b) => b.append_row(builder, index), + LargeUtf8(b) => b.append_row(builder, index), + Utf8View(b) => b.append_row(builder, index), + List(b) => b.append_row(builder, index), + LargeList(b) => b.append_row(builder, index), + Struct(b) => b.append_row(builder, index), + Map(b) => b.append_row(builder, index), + Union(b) => b.append_row(builder, index), + Dictionary(b) => b.append_row(builder, index), + RunEndEncodedInt16(b) => b.append_row(builder, index), + RunEndEncodedInt32(b) => b.append_row(builder, index), + RunEndEncodedInt64(b) => b.append_row(builder, index), + } + } +} + +/// Factory function to create the appropriate row builder for a given DataType +pub(crate) fn make_arrow_to_variant_row_builder<'a>( + data_type: &'a DataType, + array: &'a dyn Array, + options: &'a CastOptions, +) -> Result, ArrowError> { + use ArrowToVariantRowBuilder::*; + let builder = + match data_type { + DataType::Null => Null(NullArrowToVariantBuilder), + DataType::Boolean => Boolean(BooleanArrowToVariantBuilder::new(array)), + DataType::Int8 => PrimitiveInt8(PrimitiveArrowToVariantBuilder::new(array)), + DataType::Int16 => PrimitiveInt16(PrimitiveArrowToVariantBuilder::new(array)), + DataType::Int32 => PrimitiveInt32(PrimitiveArrowToVariantBuilder::new(array)), + DataType::Int64 => PrimitiveInt64(PrimitiveArrowToVariantBuilder::new(array)), + DataType::UInt8 => PrimitiveUInt8(PrimitiveArrowToVariantBuilder::new(array)), + DataType::UInt16 => PrimitiveUInt16(PrimitiveArrowToVariantBuilder::new(array)), + DataType::UInt32 => PrimitiveUInt32(PrimitiveArrowToVariantBuilder::new(array)), + DataType::UInt64 => PrimitiveUInt64(PrimitiveArrowToVariantBuilder::new(array)), + DataType::Float16 => PrimitiveFloat16(PrimitiveArrowToVariantBuilder::new(array)), + DataType::Float32 => PrimitiveFloat32(PrimitiveArrowToVariantBuilder::new(array)), + DataType::Float64 => PrimitiveFloat64(PrimitiveArrowToVariantBuilder::new(array)), + DataType::Decimal32(_, scale) => { + Decimal32(Decimal32ArrowToVariantBuilder::new(array, *scale)) + } + DataType::Decimal64(_, scale) => { + Decimal64(Decimal64ArrowToVariantBuilder::new(array, *scale)) + } + DataType::Decimal128(_, scale) => { + Decimal128(Decimal128ArrowToVariantBuilder::new(array, *scale)) + } + DataType::Decimal256(_, scale) => { + Decimal256(Decimal256ArrowToVariantBuilder::new(array, *scale)) + } + DataType::Timestamp(time_unit, time_zone) => { + match time_unit { + TimeUnit::Second => TimestampSecond(TimestampArrowToVariantBuilder::new( + array, + options, + time_zone.is_some(), + )), + TimeUnit::Millisecond => TimestampMillisecond( + TimestampArrowToVariantBuilder::new(array, options, time_zone.is_some()), + ), + TimeUnit::Microsecond => TimestampMicrosecond( + TimestampArrowToVariantBuilder::new(array, options, time_zone.is_some()), + ), + TimeUnit::Nanosecond => TimestampNanosecond( + TimestampArrowToVariantBuilder::new(array, options, time_zone.is_some()), + ), + } + } + DataType::Date32 => Date32(DateArrowToVariantBuilder::new(array, options)), + DataType::Date64 => Date64(DateArrowToVariantBuilder::new(array, options)), + DataType::Time32(time_unit) => match time_unit { + TimeUnit::Second => Time32Second(TimeArrowToVariantBuilder::new(array, options)), + TimeUnit::Millisecond => { + Time32Millisecond(TimeArrowToVariantBuilder::new(array, options)) + } + _ => { + return Err(ArrowError::CastError(format!( + "Unsupported Time32 unit: {time_unit:?}" + ))) + } + }, + DataType::Time64(time_unit) => match time_unit { + TimeUnit::Microsecond => { + Time64Microsecond(TimeArrowToVariantBuilder::new(array, options)) + } + TimeUnit::Nanosecond => { + Time64Nanosecond(TimeArrowToVariantBuilder::new(array, options)) + } + _ => { + return Err(ArrowError::CastError(format!( + "Unsupported Time64 unit: {time_unit:?}" + ))) + } + }, + DataType::Duration(_) | DataType::Interval(_) => { + return Err(ArrowError::InvalidArgumentError( + "Casting duration/interval types to Variant is not supported. \ + The Variant format does not define duration/interval types." + .to_string(), + )) + } + DataType::Binary => Binary(BinaryArrowToVariantBuilder::new(array)), + DataType::LargeBinary => LargeBinary(BinaryArrowToVariantBuilder::new(array)), + DataType::BinaryView => BinaryView(BinaryViewArrowToVariantBuilder::new(array)), + DataType::FixedSizeBinary(_) => { + FixedSizeBinary(FixedSizeBinaryArrowToVariantBuilder::new(array)) + } + DataType::Utf8 => Utf8(StringArrowToVariantBuilder::new(array)), + DataType::LargeUtf8 => LargeUtf8(StringArrowToVariantBuilder::new(array)), + DataType::Utf8View => Utf8View(StringViewArrowToVariantBuilder::new(array)), + DataType::List(_) => List(ListArrowToVariantBuilder::new(array, options)?), + DataType::LargeList(_) => LargeList(ListArrowToVariantBuilder::new(array, options)?), + DataType::Struct(_) => Struct(StructArrowToVariantBuilder::new( + array.as_struct(), + options, + )?), + DataType::Map(_, _) => Map(MapArrowToVariantBuilder::new(array, options)?), + DataType::Union(_, _) => Union(UnionArrowToVariantBuilder::new(array, options)?), + DataType::Dictionary(_, _) => { + Dictionary(DictionaryArrowToVariantBuilder::new(array, options)?) + } + DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() { + DataType::Int16 => { + RunEndEncodedInt16(RunEndEncodedArrowToVariantBuilder::new(array, options)?) + } + DataType::Int32 => { + RunEndEncodedInt32(RunEndEncodedArrowToVariantBuilder::new(array, options)?) + } + DataType::Int64 => { + RunEndEncodedInt64(RunEndEncodedArrowToVariantBuilder::new(array, options)?) + } + _ => { + return Err(ArrowError::CastError(format!( + "Unsupported run ends type: {:?}", + run_ends.data_type() + ))); + } + }, + dt => { + return Err(ArrowError::CastError(format!( + "Unsupported data type for casting to Variant: {dt:?}", + ))); + } + }; + Ok(builder) +} + +/// Macro to define (possibly generic) row builders with consistent structure and behavior. +/// +/// The macro optionally allows to define a transform for values read from the underlying +/// array. Transforms of the form `|value| { ... }` are infallible (and should produce something +/// that implements `Into`), while transforms of the form `|value| -> Option<_> { ... }` +/// are fallible (and should produce `Option>`); a failed tarnsform will either +/// append null to the builder or return an error, depending on cast options. +/// +/// Also supports optional extra fields that are passed to the constructor and which are available +/// by reference in the value transform. Providing a fallible value transform requires also +/// providing the extra field `options: &'a CastOptions`. +// TODO: If/when the macro_metavar_expr feature stabilizes, the `ignore` meta-function would allow +// us to "use" captured tokens without emitting them: +// +// ``` +// $( +// ${ignore($value)} +// $( +// ${ignore($option_ty)} +// options: &$lifetime CastOptions, +// )? +// )? +// ``` +// +// That, in turn, would allow us to inject the `options` field whenever the user specifies a +// fallible value transform, instead of requiring them to manually define it. This might not be +// worth the trouble, tho, because it makes for some pretty bulky and unwieldy macro expansions. +macro_rules! define_row_builder { + ( + struct $name:ident<$lifetime:lifetime $(, $generic:ident: $bound:path )?> + $( where $where_path:path: $where_bound:path $(,)? )? + $({ $($field:ident: $field_type:ty),+ $(,)? })?, + |$array_param:ident| -> $array_type:ty { $init_expr:expr } + $(, |$value:ident| $(-> Option<$option_ty:ty>)? $value_transform:expr)? + ) => { + pub(crate) struct $name<$lifetime $(, $generic: $bound )?> + $( where $where_path: $where_bound )? + { + array: &$lifetime $array_type, + $( $( $field: $field_type, )+ )? + } + + impl<$lifetime $(, $generic: $bound+ )?> $name<$lifetime $(, $generic)?> + $( where $where_path: $where_bound )? + { + pub(crate) fn new($array_param: &$lifetime dyn Array $(, $( $field: $field_type ),+ )?) -> Self { + Self { + array: $init_expr, + $( $( $field, )+ )? + } + } + + fn append_row(&self, builder: &mut impl VariantBuilderExt, index: usize) -> Result<(), ArrowError> { + if self.array.is_null(index) { + builder.append_null(); + } else { + // Macro hygiene: Give any extra fields names the value transform can access. + // + // The value transform doesn't normally reference cast options, but the macro's + // caller still has to declare the field because stable rust has no way to "use" + // a captured token without emitting it. So, silence unused variable warnings, + // assuming that's the `options` field. Unfortunately, that also silences + // legitimate compiler warnings if an infallible value transform fails to use + // its first extra field. + $( + #[allow(unused)] + $( let $field = &self.$field; )+ + )? + + // Apply the value transform, if any (with name swapping for hygiene) + let value = self.array.value(index); + $( + let $value = value; + let value = $value_transform; + $( + // NOTE: The `?` macro expansion fails without the type annotation. + let Some(value): Option<$option_ty> = value else { + if self.options.strict { + return Err(ArrowError::ComputeError(format!( + "Failed to convert value at index {index}: conversion failed", + ))); + } else { + builder.append_null(); + return Ok(()); + } + }; + )? + )? + builder.append_value(value); + } + Ok(()) + } + } + }; +} + +define_row_builder!( + struct BooleanArrowToVariantBuilder<'a>, + |array| -> arrow::array::BooleanArray { array.as_boolean() } +); + +define_row_builder!( + struct PrimitiveArrowToVariantBuilder<'a, T: ArrowPrimitiveType> + where T::Native: Into>, + |array| -> PrimitiveArray { array.as_primitive() } +); + +define_row_builder!( + struct Decimal32ArrowToVariantBuilder<'a> { + scale: i8, + }, + |array| -> arrow::array::Decimal32Array { array.as_primitive() }, + |value| decimal_to_variant_decimal!(value, scale, i32, VariantDecimal4) +); + +define_row_builder!( + struct Decimal64ArrowToVariantBuilder<'a> { + scale: i8, + }, + |array| -> arrow::array::Decimal64Array { array.as_primitive() }, + |value| decimal_to_variant_decimal!(value, scale, i64, VariantDecimal8) +); + +define_row_builder!( + struct Decimal128ArrowToVariantBuilder<'a> { + scale: i8, + }, + |array| -> arrow::array::Decimal128Array { array.as_primitive() }, + |value| decimal_to_variant_decimal!(value, scale, i128, VariantDecimal16) +); + +define_row_builder!( + struct Decimal256ArrowToVariantBuilder<'a> { + scale: i8, + }, + |array| -> arrow::array::Decimal256Array { array.as_primitive() }, + |value| { + // Decimal256 needs special handling - convert to i128 if possible + match value.to_i128() { + Some(i128_val) => decimal_to_variant_decimal!(i128_val, scale, i128, VariantDecimal16), + None => Variant::Null, // Value too large for i128 + } + } +); + +define_row_builder!( + struct TimestampArrowToVariantBuilder<'a, T: ArrowTimestampType> { + options: &'a CastOptions, + has_time_zone: bool, + }, + |array| -> arrow::array::PrimitiveArray { array.as_primitive() }, + |value| -> Option<_> { + // Convert using Arrow's temporal conversion functions + as_datetime::(value).map(|naive_datetime| { + if *has_time_zone { + // Has timezone -> DateTime -> TimestampMicros/TimestampNanos + let utc_dt: DateTime = Utc.from_utc_datetime(&naive_datetime); + Variant::from(utc_dt) // Uses From> for Variant + } else { + // No timezone -> NaiveDateTime -> TimestampNtzMicros/TimestampNtzNanos + Variant::from(naive_datetime) // Uses From for Variant + } + }) + } +); + +define_row_builder!( + struct DateArrowToVariantBuilder<'a, T: ArrowTemporalType> + where + i64: From, + { + options: &'a CastOptions, + }, + |array| -> PrimitiveArray { array.as_primitive() }, + |value| -> Option<_> { + let date_value = i64::from(value); + as_date::(date_value) + } +); + +define_row_builder!( + struct TimeArrowToVariantBuilder<'a, T: ArrowTemporalType> + where + i64: From, + { + options: &'a CastOptions, + }, + |array| -> PrimitiveArray { array.as_primitive() }, + |value| -> Option<_> { + let time_value = i64::from(value); + as_time::(time_value) + } +); + +define_row_builder!( + struct BinaryArrowToVariantBuilder<'a, O: OffsetSizeTrait>, + |array| -> GenericBinaryArray { array.as_binary() } +); + +define_row_builder!( + struct BinaryViewArrowToVariantBuilder<'a>, + |array| -> arrow::array::BinaryViewArray { array.as_byte_view() } +); + +define_row_builder!( + struct FixedSizeBinaryArrowToVariantBuilder<'a>, + |array| -> arrow::array::FixedSizeBinaryArray { array.as_fixed_size_binary() } +); + +define_row_builder!( + struct StringArrowToVariantBuilder<'a, O: OffsetSizeTrait>, + |array| -> GenericStringArray { array.as_string() } +); + +define_row_builder!( + struct StringViewArrowToVariantBuilder<'a>, + |array| -> arrow::array::StringViewArray { array.as_string_view() } +); + +/// Null builder that always appends null +pub(crate) struct NullArrowToVariantBuilder; + +impl NullArrowToVariantBuilder { + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + _index: usize, + ) -> Result<(), ArrowError> { + builder.append_null(); + Ok(()) + } +} + +/// Generic list builder for List and LargeList types +pub(crate) struct ListArrowToVariantBuilder<'a, O: OffsetSizeTrait> { + list_array: &'a arrow::array::GenericListArray, + values_builder: Box>, +} + +impl<'a, O: OffsetSizeTrait> ListArrowToVariantBuilder<'a, O> { + pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> Result { + let list_array = array.as_list(); + let values = list_array.values(); + let values_builder = + make_arrow_to_variant_row_builder(values.data_type(), values.as_ref(), options)?; + + Ok(Self { + list_array, + values_builder: Box::new(values_builder), + }) + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<(), ArrowError> { + if self.list_array.is_null(index) { + builder.append_null(); + return Ok(()); + } + + let offsets = self.list_array.offsets(); + let start = offsets[index].as_usize(); + let end = offsets[index + 1].as_usize(); + + let mut list_builder = builder.try_new_list()?; + for value_index in start..end { + self.values_builder + .append_row(&mut list_builder, value_index)?; + } + list_builder.finish(); + Ok(()) + } +} + +/// Struct builder for StructArray +pub(crate) struct StructArrowToVariantBuilder<'a> { + struct_array: &'a arrow::array::StructArray, + field_builders: Vec<(&'a str, ArrowToVariantRowBuilder<'a>)>, +} + +impl<'a> StructArrowToVariantBuilder<'a> { + pub(crate) fn new( + struct_array: &'a arrow::array::StructArray, + options: &'a CastOptions, + ) -> Result { + let mut field_builders = Vec::new(); + + // Create a row builder for each field + for (field_name, field_array) in struct_array + .column_names() + .iter() + .zip(struct_array.columns().iter()) + { + let field_builder = make_arrow_to_variant_row_builder( + field_array.data_type(), + field_array.as_ref(), + options, + )?; + field_builders.push((*field_name, field_builder)); + } + + Ok(Self { + struct_array, + field_builders, + }) + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<(), ArrowError> { + if self.struct_array.is_null(index) { + builder.append_null(); + } else { + // Create object builder for this struct row + let mut obj_builder = builder.try_new_object()?; + + // Process each field + for (field_name, row_builder) in &mut self.field_builders { + let mut field_builder = + parquet_variant::ObjectFieldBuilder::new(field_name, &mut obj_builder); + row_builder.append_row(&mut field_builder, index)?; + } + + obj_builder.finish(); + } + Ok(()) + } +} + +/// Map builder for MapArray types +pub(crate) struct MapArrowToVariantBuilder<'a> { + map_array: &'a arrow::array::MapArray, + key_strings: arrow::array::StringArray, + values_builder: Box>, +} + +impl<'a> MapArrowToVariantBuilder<'a> { + pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> Result { + let map_array = array.as_map(); + + // Pre-cast keys to strings once + let keys = cast(map_array.keys(), &DataType::Utf8)?; + let key_strings = keys.as_string::().clone(); + + // Create recursive builder for values + let values = map_array.values(); + let values_builder = + make_arrow_to_variant_row_builder(values.data_type(), values.as_ref(), options)?; + + Ok(Self { + map_array, + key_strings, + values_builder: Box::new(values_builder), + }) + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<(), ArrowError> { + // Check for NULL map first (via null bitmap) + if self.map_array.is_null(index) { + builder.append_null(); + return Ok(()); + } + + let offsets = self.map_array.offsets(); + let start = offsets[index].as_usize(); + let end = offsets[index + 1].as_usize(); + + // Create object builder for this map + let mut object_builder = builder.try_new_object()?; + + // Add each key-value pair (loop does nothing for empty maps - correct!) + for kv_index in start..end { + let key = self.key_strings.value(kv_index); + let mut field_builder = ObjectFieldBuilder::new(key, &mut object_builder); + self.values_builder + .append_row(&mut field_builder, kv_index)?; + } + + object_builder.finish(); + Ok(()) + } +} + +/// Union builder for both sparse and dense union arrays +/// +/// NOTE: Union type ids are _not_ required to be dense, hence the hash map for child builders. +pub(crate) struct UnionArrowToVariantBuilder<'a> { + union_array: &'a arrow::array::UnionArray, + child_builders: HashMap>>, +} + +impl<'a> UnionArrowToVariantBuilder<'a> { + pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> Result { + let union_array = array.as_union(); + let type_ids = union_array.type_ids(); + + // Create child builders for each union field + let mut child_builders = HashMap::new(); + for &type_id in type_ids { + let child_array = union_array.child(type_id); + let child_builder = make_arrow_to_variant_row_builder( + child_array.data_type(), + child_array.as_ref(), + options, + )?; + child_builders.insert(type_id, Box::new(child_builder)); + } + + Ok(Self { + union_array, + child_builders, + }) + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<(), ArrowError> { + let type_id = self.union_array.type_id(index); + let value_offset = self.union_array.value_offset(index); + + // Delegate to the appropriate child builder, or append null to handle an invalid type_id + match self.child_builders.get_mut(&type_id) { + Some(child_builder) => child_builder.append_row(builder, value_offset)?, + None => builder.append_null(), + } + + Ok(()) + } +} + +/// Dictionary array builder with simple O(1) indexing +pub(crate) struct DictionaryArrowToVariantBuilder<'a> { + keys: &'a dyn Array, // only needed for null checks + normalized_keys: Vec, + values_builder: Box>, +} + +impl<'a> DictionaryArrowToVariantBuilder<'a> { + pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> Result { + let dict_array = array.as_any_dictionary(); + let values = dict_array.values(); + let values_builder = + make_arrow_to_variant_row_builder(values.data_type(), values.as_ref(), options)?; + + // WARNING: normalized_keys panics if values is empty + let normalized_keys = match values.len() { + 0 => Vec::new(), + _ => dict_array.normalized_keys(), + }; + + Ok(Self { + keys: dict_array.keys(), + normalized_keys, + values_builder: Box::new(values_builder), + }) + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<(), ArrowError> { + if self.keys.is_null(index) { + builder.append_null(); + } else { + let normalized_key = self.normalized_keys[index]; + self.values_builder.append_row(builder, normalized_key)?; + } + Ok(()) + } +} + +/// Run-end encoded array builder with efficient sequential access +pub(crate) struct RunEndEncodedArrowToVariantBuilder<'a, R: RunEndIndexType> { + run_array: &'a arrow::array::RunArray, + values_builder: Box>, + + run_ends: &'a [R::Native], + run_number: usize, // Physical index into run_ends and values + run_start: usize, // Logical start index of current run +} + +impl<'a, R: RunEndIndexType> RunEndEncodedArrowToVariantBuilder<'a, R> { + pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> Result { + let Some(run_array) = array.as_run_opt() else { + return Err(ArrowError::CastError("Expected RunArray".to_string())); + }; + + let values = run_array.values(); + let values_builder = + make_arrow_to_variant_row_builder(values.data_type(), values.as_ref(), options)?; + + Ok(Self { + run_array, + values_builder: Box::new(values_builder), + run_ends: run_array.run_ends().values(), + run_number: 0, + run_start: 0, + }) + } + + fn set_run_for_index(&mut self, index: usize) -> Result<(), ArrowError> { + if index >= self.run_start { + let Some(run_end) = self.run_ends.get(self.run_number) else { + return Err(ArrowError::CastError(format!( + "Index {index} beyond run array" + ))); + }; + if index < run_end.as_usize() { + return Ok(()); + } + if index == run_end.as_usize() { + self.run_number += 1; + self.run_start = run_end.as_usize(); + return Ok(()); + } + } + + // Use partition_point for all non-sequential cases + let run_number = self + .run_ends + .partition_point(|&run_end| run_end.as_usize() <= index); + if run_number >= self.run_ends.len() { + return Err(ArrowError::CastError(format!( + "Index {index} beyond run array" + ))); + } + self.run_number = run_number; + self.run_start = match run_number { + 0 => 0, + _ => self.run_ends[run_number - 1].as_usize(), + }; + Ok(()) + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<(), ArrowError> { + self.set_run_for_index(index)?; + + // Handle null values + if self.run_array.values().is_null(self.run_number) { + builder.append_null(); + return Ok(()); + } + + // Re-encode the value + self.values_builder.append_row(builder, self.run_number)?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{VariantArray, VariantArrayBuilder}; + use arrow::array::{ArrayRef, BooleanArray, Int32Array, StringArray}; + use std::sync::Arc; + + /// Builds a VariantArray from an Arrow array using the row builder. + fn execute_row_builder_test(array: &dyn Array) -> VariantArray { + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(array.data_type(), array, &options).unwrap(); + + let mut array_builder = VariantArrayBuilder::new(array.len()); + + // The repetitive loop that appears in every test + for i in 0..array.len() { + let mut variant_builder = array_builder.variant_builder(); + row_builder.append_row(&mut variant_builder, i).unwrap(); + variant_builder.finish(); + } + + let variant_array = array_builder.build(); + assert_eq!(variant_array.len(), array.len()); + variant_array + } + + /// Generic helper function to test row builders with basic assertion patterns. + /// Uses execute_row_builder_test and adds simple value comparison assertions. + fn test_row_builder_basic(array: &dyn Array, expected_values: Vec>) { + let variant_array = execute_row_builder_test(array); + + // The repetitive assertion pattern + for (i, expected) in expected_values.iter().enumerate() { + match expected { + Some(variant) => { + assert_eq!(variant_array.value(i), *variant, "Mismatch at index {}", i) + } + None => assert!(variant_array.is_null(i), "Expected null at index {}", i), + } + } + } + + #[test] + fn test_primitive_row_builder() { + let int_array = Int32Array::from(vec![Some(42), None, Some(100)]); + test_row_builder_basic( + &int_array, + vec![Some(Variant::Int32(42)), None, Some(Variant::Int32(100))], + ); + } + + #[test] + fn test_string_row_builder() { + let string_array = StringArray::from(vec![Some("hello"), None, Some("world")]); + test_row_builder_basic( + &string_array, + vec![ + Some(Variant::from("hello")), + None, + Some(Variant::from("world")), + ], + ); + } + + #[test] + fn test_boolean_row_builder() { + let bool_array = BooleanArray::from(vec![Some(true), None, Some(false)]); + test_row_builder_basic( + &bool_array, + vec![Some(Variant::from(true)), None, Some(Variant::from(false))], + ); + } + + #[test] + fn test_struct_row_builder() { + use arrow::array::{ArrayRef, Int32Array, StringArray, StructArray}; + use arrow_schema::{DataType, Field}; + use std::sync::Arc; + + // Create a struct array with int and string fields + let int_field = Field::new("id", DataType::Int32, true); + let string_field = Field::new("name", DataType::Utf8, true); + + let int_array = Int32Array::from(vec![Some(1), None, Some(3)]); + let string_array = StringArray::from(vec![Some("Alice"), Some("Bob"), None]); + + let struct_array = StructArray::try_new( + vec![int_field, string_field].into(), + vec![ + Arc::new(int_array) as ArrayRef, + Arc::new(string_array) as ArrayRef, + ], + None, + ) + .unwrap(); + + let variant_array = execute_row_builder_test(&struct_array); + + // Check first row - should have both fields + let first_variant = variant_array.value(0); + assert_eq!(first_variant.get_object_field("id"), Some(Variant::from(1))); + assert_eq!( + first_variant.get_object_field("name"), + Some(Variant::from("Alice")) + ); + + // Check second row - should have name field but not id (null field omitted) + let second_variant = variant_array.value(1); + assert_eq!(second_variant.get_object_field("id"), None); // null field omitted + assert_eq!( + second_variant.get_object_field("name"), + Some(Variant::from("Bob")) + ); + + // Check third row - should have id field but not name (null field omitted) + let third_variant = variant_array.value(2); + assert_eq!(third_variant.get_object_field("id"), Some(Variant::from(3))); + assert_eq!(third_variant.get_object_field("name"), None); // null field omitted + } + + #[test] + fn test_run_end_encoded_row_builder() { + use arrow::array::{Int32Array, RunArray}; + use arrow::datatypes::Int32Type; + + // Create a run-end encoded array: [A, A, B, B, B, C] + // run_ends: [2, 5, 6] + // values: ["A", "B", "C"] + let values = StringArray::from(vec!["A", "B", "C"]); + let run_ends = Int32Array::from(vec![2, 5, 6]); + let run_array = RunArray::::try_new(&run_ends, &values).unwrap(); + + let variant_array = execute_row_builder_test(&run_array); + + // Verify the values + assert_eq!(variant_array.value(0), Variant::from("A")); // Run 0 + assert_eq!(variant_array.value(1), Variant::from("A")); // Run 0 + assert_eq!(variant_array.value(2), Variant::from("B")); // Run 1 + assert_eq!(variant_array.value(3), Variant::from("B")); // Run 1 + assert_eq!(variant_array.value(4), Variant::from("B")); // Run 1 + assert_eq!(variant_array.value(5), Variant::from("C")); // Run 2 + } + + #[test] + fn test_run_end_encoded_random_access() { + use arrow::array::{Int32Array, RunArray}; + use arrow::datatypes::Int32Type; + + // Create a run-end encoded array: [A, A, B, B, B, C] + let values = StringArray::from(vec!["A", "B", "C"]); + let run_ends = Int32Array::from(vec![2, 5, 6]); + let run_array = RunArray::::try_new(&run_ends, &values).unwrap(); + + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(run_array.data_type(), &run_array, &options).unwrap(); + + // Test random access pattern (backward jumps, forward jumps) + let access_pattern = [0, 5, 2, 4, 1, 3]; // Mix of all cases + let expected_values = ["A", "C", "B", "B", "A", "B"]; + + for (i, &index) in access_pattern.iter().enumerate() { + let mut array_builder = VariantArrayBuilder::new(1); + let mut variant_builder = array_builder.variant_builder(); + row_builder.append_row(&mut variant_builder, index).unwrap(); + variant_builder.finish(); + + let variant_array = array_builder.build(); + assert_eq!(variant_array.value(0), Variant::from(expected_values[i])); + } + } + + #[test] + fn test_run_end_encoded_with_nulls() { + use arrow::array::{Int32Array, RunArray}; + use arrow::datatypes::Int32Type; + + // Create a run-end encoded array with null values: [A, A, null, null, B] + let values = StringArray::from(vec![Some("A"), None, Some("B")]); + let run_ends = Int32Array::from(vec![2, 4, 5]); + let run_array = RunArray::::try_new(&run_ends, &values).unwrap(); + + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(run_array.data_type(), &run_array, &options).unwrap(); + let mut array_builder = VariantArrayBuilder::new(5); + + // Test sequential access + for i in 0..5 { + let mut variant_builder = array_builder.variant_builder(); + row_builder.append_row(&mut variant_builder, i).unwrap(); + variant_builder.finish(); + } + + let variant_array = array_builder.build(); + assert_eq!(variant_array.len(), 5); + + // Verify the values + assert_eq!(variant_array.value(0), Variant::from("A")); // Run 0 + assert_eq!(variant_array.value(1), Variant::from("A")); // Run 0 + assert!(variant_array.is_null(2)); // Run 1 (null) + assert!(variant_array.is_null(3)); // Run 1 (null) + assert_eq!(variant_array.value(4), Variant::from("B")); // Run 2 + } + + #[test] + fn test_dictionary_row_builder() { + use arrow::array::{DictionaryArray, Int32Array}; + use arrow::datatypes::Int32Type; + + // Create a dictionary array: keys=[0, 1, 0, 2, 1], values=["apple", "banana", "cherry"] + let values = StringArray::from(vec!["apple", "banana", "cherry"]); + let keys = Int32Array::from(vec![0, 1, 0, 2, 1]); + let dict_array = DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + + let variant_array = execute_row_builder_test(&dict_array); + + // Verify the values match the dictionary lookup + assert_eq!(variant_array.value(0), Variant::from("apple")); // keys[0] = 0 -> values[0] = "apple" + assert_eq!(variant_array.value(1), Variant::from("banana")); // keys[1] = 1 -> values[1] = "banana" + assert_eq!(variant_array.value(2), Variant::from("apple")); // keys[2] = 0 -> values[0] = "apple" + assert_eq!(variant_array.value(3), Variant::from("cherry")); // keys[3] = 2 -> values[2] = "cherry" + assert_eq!(variant_array.value(4), Variant::from("banana")); // keys[4] = 1 -> values[1] = "banana" + } + + #[test] + fn test_dictionary_with_nulls() { + use arrow::array::{DictionaryArray, Int32Array}; + use arrow::datatypes::Int32Type; + + // Create a dictionary array with null keys: keys=[0, null, 1, null, 2], values=["x", "y", "z"] + let values = StringArray::from(vec!["x", "y", "z"]); + let keys = Int32Array::from(vec![Some(0), None, Some(1), None, Some(2)]); + let dict_array = DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(dict_array.data_type(), &dict_array, &options) + .unwrap(); + let mut array_builder = VariantArrayBuilder::new(5); + + // Test sequential access + for i in 0..5 { + let mut variant_builder = array_builder.variant_builder(); + row_builder.append_row(&mut variant_builder, i).unwrap(); + variant_builder.finish(); + } + + let variant_array = array_builder.build(); + assert_eq!(variant_array.len(), 5); + + // Verify the values and nulls + assert_eq!(variant_array.value(0), Variant::from("x")); // keys[0] = 0 -> values[0] = "x" + assert!(variant_array.is_null(1)); // keys[1] = null + assert_eq!(variant_array.value(2), Variant::from("y")); // keys[2] = 1 -> values[1] = "y" + assert!(variant_array.is_null(3)); // keys[3] = null + assert_eq!(variant_array.value(4), Variant::from("z")); // keys[4] = 2 -> values[2] = "z" + } + + #[test] + fn test_dictionary_random_access() { + use arrow::array::{DictionaryArray, Int32Array}; + use arrow::datatypes::Int32Type; + + // Create a dictionary array: keys=[0, 1, 2, 0, 1, 2], values=["red", "green", "blue"] + let values = StringArray::from(vec!["red", "green", "blue"]); + let keys = Int32Array::from(vec![0, 1, 2, 0, 1, 2]); + let dict_array = DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(dict_array.data_type(), &dict_array, &options) + .unwrap(); + + // Test random access pattern + let access_pattern = [5, 0, 3, 1, 4, 2]; // Random order + let expected_values = ["blue", "red", "red", "green", "green", "blue"]; + + for (i, &index) in access_pattern.iter().enumerate() { + let mut array_builder = VariantArrayBuilder::new(1); + let mut variant_builder = array_builder.variant_builder(); + row_builder.append_row(&mut variant_builder, index).unwrap(); + variant_builder.finish(); + + let variant_array = array_builder.build(); + assert_eq!(variant_array.value(0), Variant::from(expected_values[i])); + } + } + + #[test] + fn test_nested_dictionary() { + use arrow::array::{DictionaryArray, Int32Array, StructArray}; + use arrow::datatypes::{Field, Int32Type}; + + // Create a dictionary with struct values + let id_array = Int32Array::from(vec![1, 2, 3]); + let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]); + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, false)), + Arc::new(id_array) as ArrayRef, + ), + ( + Arc::new(Field::new("name", DataType::Utf8, false)), + Arc::new(name_array) as ArrayRef, + ), + ]); + + let keys = Int32Array::from(vec![0, 1, 0, 2, 1]); + let dict_array = + DictionaryArray::::try_new(keys, Arc::new(struct_array)).unwrap(); + + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(dict_array.data_type(), &dict_array, &options) + .unwrap(); + let mut array_builder = VariantArrayBuilder::new(5); + + // Test sequential access + for i in 0..5 { + let mut variant_builder = array_builder.variant_builder(); + row_builder.append_row(&mut variant_builder, i).unwrap(); + variant_builder.finish(); + } + + let variant_array = array_builder.build(); + assert_eq!(variant_array.len(), 5); + + // Verify the nested struct values + let first_variant = variant_array.value(0); + assert_eq!(first_variant.get_object_field("id"), Some(Variant::from(1))); + assert_eq!( + first_variant.get_object_field("name"), + Some(Variant::from("Alice")) + ); + + let second_variant = variant_array.value(1); + assert_eq!( + second_variant.get_object_field("id"), + Some(Variant::from(2)) + ); + assert_eq!( + second_variant.get_object_field("name"), + Some(Variant::from("Bob")) + ); + + // Test that repeated keys give same values + let third_variant = variant_array.value(2); + assert_eq!(third_variant.get_object_field("id"), Some(Variant::from(1))); + assert_eq!( + third_variant.get_object_field("name"), + Some(Variant::from("Alice")) + ); + } + + #[test] + fn test_list_row_builder() { + use arrow::array::ListArray; + + // Create a list array: [[1, 2], [3, 4, 5], null, []] + let data = vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4), Some(5)]), + None, + Some(vec![]), + ]; + let list_array = ListArray::from_iter_primitive::(data); + + let variant_array = execute_row_builder_test(&list_array); + + // Row 0: [1, 2] + let row0 = variant_array.value(0); + let list0 = row0.as_list().unwrap(); + assert_eq!(list0.len(), 2); + assert_eq!(list0.get(0), Some(Variant::from(1))); + assert_eq!(list0.get(1), Some(Variant::from(2))); + + // Row 1: [3, 4, 5] + let row1 = variant_array.value(1); + let list1 = row1.as_list().unwrap(); + assert_eq!(list1.len(), 3); + assert_eq!(list1.get(0), Some(Variant::from(3))); + assert_eq!(list1.get(1), Some(Variant::from(4))); + assert_eq!(list1.get(2), Some(Variant::from(5))); + + // Row 2: null + assert!(variant_array.is_null(2)); + + // Row 3: [] + let row3 = variant_array.value(3); + let list3 = row3.as_list().unwrap(); + assert_eq!(list3.len(), 0); + } + + #[test] + fn test_sliced_list_row_builder() { + use arrow::array::ListArray; + + // Create a list array: [[1, 2], [3, 4, 5], [6]] + let data = vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4), Some(5)]), + Some(vec![Some(6)]), + ]; + let list_array = ListArray::from_iter_primitive::(data); + + // Slice to get just the middle element: [[3, 4, 5]] + let sliced_array = list_array.slice(1, 1); + + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(sliced_array.data_type(), &sliced_array, &options) + .unwrap(); + let mut variant_array_builder = VariantArrayBuilder::new(sliced_array.len()); + + // Test the single row + let mut builder = variant_array_builder.variant_builder(); + row_builder.append_row(&mut builder, 0).unwrap(); + builder.finish(); + + let variant_array = variant_array_builder.build(); + + // Verify result + assert_eq!(variant_array.len(), 1); + + // Row 0: [3, 4, 5] + let row0 = variant_array.value(0); + let list0 = row0.as_list().unwrap(); + assert_eq!(list0.len(), 3); + assert_eq!(list0.get(0), Some(Variant::from(3))); + assert_eq!(list0.get(1), Some(Variant::from(4))); + assert_eq!(list0.get(2), Some(Variant::from(5))); + } + + #[test] + fn test_nested_list_row_builder() { + use arrow::array::ListArray; + use arrow::datatypes::Field; + + // Build the nested structure manually + let inner_field = Arc::new(Field::new("item", DataType::Int32, true)); + let inner_list_field = Arc::new(Field::new("item", DataType::List(inner_field), true)); + + let values_data = vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(3)])]; + let values_list = ListArray::from_iter_primitive::(values_data); + + let outer_offsets = arrow::buffer::OffsetBuffer::new(vec![0i32, 2, 2].into()); + let outer_list = ListArray::new( + inner_list_field, + outer_offsets, + Arc::new(values_list), + Some(arrow::buffer::NullBuffer::from(vec![true, false])), + ); + + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(outer_list.data_type(), &outer_list, &options) + .unwrap(); + let mut variant_array_builder = VariantArrayBuilder::new(outer_list.len()); + + for i in 0..outer_list.len() { + let mut builder = variant_array_builder.variant_builder(); + row_builder.append_row(&mut builder, i).unwrap(); + builder.finish(); + } + + let variant_array = variant_array_builder.build(); + + // Verify results + assert_eq!(variant_array.len(), 2); + + // Row 0: [[1, 2], [3]] + let row0 = variant_array.value(0); + let outer_list0 = row0.as_list().unwrap(); + assert_eq!(outer_list0.len(), 2); + + let inner_list0_0 = outer_list0.get(0).unwrap(); + let inner_list0_0 = inner_list0_0.as_list().unwrap(); + assert_eq!(inner_list0_0.len(), 2); + assert_eq!(inner_list0_0.get(0), Some(Variant::from(1))); + assert_eq!(inner_list0_0.get(1), Some(Variant::from(2))); + + let inner_list0_1 = outer_list0.get(1).unwrap(); + let inner_list0_1 = inner_list0_1.as_list().unwrap(); + assert_eq!(inner_list0_1.len(), 1); + assert_eq!(inner_list0_1.get(0), Some(Variant::from(3))); + + // Row 1: null + assert!(variant_array.is_null(1)); + } + + #[test] + fn test_map_row_builder() { + use arrow::array::{Int32Array, MapArray, StringArray, StructArray}; + use arrow::buffer::{NullBuffer, OffsetBuffer}; + use arrow::datatypes::{DataType, Field, Fields}; + use std::sync::Arc; + + // Create the entries struct array (key-value pairs) + let keys = StringArray::from(vec!["key1", "key2", "key3"]); + let values = Int32Array::from(vec![1, 2, 3]); + let entries_fields = Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, true), + ]); + let entries = StructArray::new( + entries_fields.clone(), + vec![Arc::new(keys), Arc::new(values)], + None, // No nulls in the entries themselves + ); + + // Create offsets for 4 maps: [0..1], [1..1], [1..1], [1..3] + // Map 0: {"key1": 1} (1 entry) + // Map 1: {} (0 entries - empty) + // Map 2: null (0 entries but NULL via null buffer) + // Map 3: {"key2": 2, "key3": 3} (2 entries) + let offsets = OffsetBuffer::new(vec![0, 1, 1, 1, 3].into()); + + // Create null buffer - map at index 2 is NULL + let null_buffer = Some(NullBuffer::from(vec![true, true, false, true])); + + // Create the map field + let map_field = Arc::new(Field::new( + "entries", + DataType::Struct(entries_fields), + false, // Keys are non-nullable + )); + + // Create MapArray using try_new + let map_array = MapArray::try_new( + map_field, + offsets, + entries, + null_buffer, + false, // not ordered + ) + .unwrap(); + + let variant_array = execute_row_builder_test(&map_array); + + // Map 0: {"key1": 1} + let map0 = variant_array.value(0); + let obj0 = map0.as_object().unwrap(); + assert_eq!(obj0.len(), 1); + assert_eq!(obj0.get("key1"), Some(Variant::from(1))); + + // Map 1: {} (empty object, not null) + let map1 = variant_array.value(1); + let obj1 = map1.as_object().unwrap(); + assert_eq!(obj1.len(), 0); // Empty object + + // Map 2: null (actual NULL) + assert!(variant_array.is_null(2)); + + // Map 3: {"key2": 2, "key3": 3} + let map3 = variant_array.value(3); + let obj3 = map3.as_object().unwrap(); + assert_eq!(obj3.len(), 2); + assert_eq!(obj3.get("key2"), Some(Variant::from(2))); + assert_eq!(obj3.get("key3"), Some(Variant::from(3))); + } + + #[test] + fn test_union_sparse_row_builder() { + use arrow::array::{Float64Array, Int32Array, StringArray, UnionArray}; + use arrow::buffer::ScalarBuffer; + use arrow::datatypes::{DataType, Field, UnionFields}; + use std::sync::Arc; + + // Create a sparse union array with mixed types (int, float, string) + let int_array = Int32Array::from(vec![Some(1), None, None, None, Some(34), None]); + let float_array = Float64Array::from(vec![None, Some(3.2), None, Some(32.5), None, None]); + let string_array = StringArray::from(vec![None, None, Some("hello"), None, None, None]); + let type_ids = [0, 1, 2, 1, 0, 0].into_iter().collect::>(); + + let union_fields = UnionFields::new( + vec![0, 1, 2], + vec![ + Field::new("int_field", DataType::Int32, false), + Field::new("float_field", DataType::Float64, false), + Field::new("string_field", DataType::Utf8, false), + ], + ); + + let children: Vec> = vec![ + Arc::new(int_array), + Arc::new(float_array), + Arc::new(string_array), + ]; + + let union_array = UnionArray::try_new( + union_fields, + type_ids, + None, // Sparse union + children, + ) + .unwrap(); + + let variant_array = execute_row_builder_test(&union_array); + assert_eq!(variant_array.value(0), Variant::Int32(1)); + assert_eq!(variant_array.value(1), Variant::Double(3.2)); + assert_eq!(variant_array.value(2), Variant::from("hello")); + assert_eq!(variant_array.value(3), Variant::Double(32.5)); + assert_eq!(variant_array.value(4), Variant::Int32(34)); + assert!(variant_array.is_null(5)); + } + + #[test] + fn test_union_dense_row_builder() { + use arrow::array::{Float64Array, Int32Array, StringArray, UnionArray}; + use arrow::buffer::ScalarBuffer; + use arrow::datatypes::{DataType, Field, UnionFields}; + use std::sync::Arc; + + // Create a dense union array with mixed types (int, float, string) + let int_array = Int32Array::from(vec![Some(1), Some(34), None]); + let float_array = Float64Array::from(vec![3.2, 32.5]); + let string_array = StringArray::from(vec!["hello"]); + let type_ids = [0, 1, 2, 1, 0, 0].into_iter().collect::>(); + let offsets = [0, 0, 0, 1, 1, 2] + .into_iter() + .collect::>(); + + let union_fields = UnionFields::new( + vec![0, 1, 2], + vec![ + Field::new("int_field", DataType::Int32, false), + Field::new("float_field", DataType::Float64, false), + Field::new("string_field", DataType::Utf8, false), + ], + ); + + let children: Vec> = vec![ + Arc::new(int_array), + Arc::new(float_array), + Arc::new(string_array), + ]; + + let union_array = UnionArray::try_new( + union_fields, + type_ids, + Some(offsets), // Dense union + children, + ) + .unwrap(); + + // Test the row builder + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(union_array.data_type(), &union_array, &options) + .unwrap(); + + let mut variant_builder = VariantArrayBuilder::new(union_array.len()); + for i in 0..union_array.len() { + let mut builder = variant_builder.variant_builder(); + row_builder.append_row(&mut builder, i).unwrap(); + builder.finish(); + } + let variant_array = variant_builder.build(); + + assert_eq!(variant_array.len(), 6); + assert_eq!(variant_array.value(0), Variant::Int32(1)); + assert_eq!(variant_array.value(1), Variant::Double(3.2)); + assert_eq!(variant_array.value(2), Variant::from("hello")); + assert_eq!(variant_array.value(3), Variant::Double(32.5)); + assert_eq!(variant_array.value(4), Variant::Int32(34)); + assert!(variant_array.is_null(5)); + } + + #[test] + fn test_union_sparse_type_ids_row_builder() { + use arrow::array::{Int32Array, StringArray, UnionArray}; + use arrow::buffer::ScalarBuffer; + use arrow::datatypes::{DataType, Field, UnionFields}; + use std::sync::Arc; + + // Create a sparse union with non-contiguous type IDs (1, 3) + let int_array = Int32Array::from(vec![Some(42), None]); + let string_array = StringArray::from(vec![None, Some("test")]); + let type_ids = [1, 3].into_iter().collect::>(); + + let union_fields = UnionFields::new( + vec![1, 3], // Non-contiguous type IDs + vec![ + Field::new("int_field", DataType::Int32, false), + Field::new("string_field", DataType::Utf8, false), + ], + ); + + let children: Vec> = vec![Arc::new(int_array), Arc::new(string_array)]; + + let union_array = UnionArray::try_new( + union_fields, + type_ids, + None, // Sparse union + children, + ) + .unwrap(); + + // Test the row builder + let options = CastOptions::default(); + let mut row_builder = + make_arrow_to_variant_row_builder(union_array.data_type(), &union_array, &options) + .unwrap(); + + let mut variant_builder = VariantArrayBuilder::new(union_array.len()); + for i in 0..union_array.len() { + let mut builder = variant_builder.variant_builder(); + row_builder.append_row(&mut builder, i).unwrap(); + builder.finish(); + } + let variant_array = variant_builder.build(); + + // Verify results + assert_eq!(variant_array.len(), 2); + + // Row 0: int 42 (type_id = 1) + assert_eq!(variant_array.value(0), Variant::Int32(42)); + + // Row 1: string "test" (type_id = 3) + assert_eq!(variant_array.value(1), Variant::from("test")); + } + + #[test] + fn test_decimal32_row_builder() { + use arrow::array::Decimal32Array; + use parquet_variant::VariantDecimal4; + + // Test Decimal32Array with scale 2 (e.g., for currency: 12.34) + let decimal_array = Decimal32Array::from(vec![Some(1234), None, Some(-5678)]) + .with_precision_and_scale(9, 2) + .unwrap(); + + test_row_builder_basic( + &decimal_array, + vec![ + Some(Variant::from(VariantDecimal4::try_new(1234, 2).unwrap())), + None, + Some(Variant::from(VariantDecimal4::try_new(-5678, 2).unwrap())), + ], + ); + } + + #[test] + fn test_decimal128_row_builder() { + use arrow::array::Decimal128Array; + use parquet_variant::VariantDecimal16; + + // Test Decimal128Array with negative scale (multiply by 10^|scale|) + let decimal_array = Decimal128Array::from(vec![Some(123), None, Some(456)]) + .with_precision_and_scale(10, -2) + .unwrap(); + + test_row_builder_basic( + &decimal_array, + vec![ + Some(Variant::from(VariantDecimal16::try_new(12300, 0).unwrap())), + None, + Some(Variant::from(VariantDecimal16::try_new(45600, 0).unwrap())), + ], + ); + } + + #[test] + fn test_decimal256_overflow_row_builder() { + use arrow::array::Decimal256Array; + use arrow::datatypes::i256; + + // Test Decimal256Array with a value that overflows i128 + let large_value = i256::from_i128(i128::MAX) + i256::from(1); // Overflows i128 + let decimal_array = Decimal256Array::from(vec![Some(large_value), Some(i256::from(123))]) + .with_precision_and_scale(76, 3) + .unwrap(); + + test_row_builder_basic( + &decimal_array, + vec![ + Some(Variant::Null), // Overflow value becomes Null + Some(Variant::from(VariantDecimal16::try_new(123, 3).unwrap())), + ], + ); + } + + #[test] + fn test_binary_row_builder() { + use arrow::array::BinaryArray; + + let binary_data = vec![ + Some(b"hello".as_slice()), + None, + Some(b"\x00\x01\x02\xFF".as_slice()), + Some(b"".as_slice()), // Empty binary + ]; + let binary_array = BinaryArray::from(binary_data); + + test_row_builder_basic( + &binary_array, + vec![ + Some(Variant::from(b"hello".as_slice())), + None, + Some(Variant::from([0x00, 0x01, 0x02, 0xFF].as_slice())), + Some(Variant::from([].as_slice())), + ], + ); + } + + #[test] + fn test_binary_view_row_builder() { + use arrow::array::BinaryViewArray; + + let binary_data = vec![ + Some(b"short".as_slice()), + None, + Some(b"this is a longer binary view that exceeds inline storage".as_slice()), + ]; + let binary_view_array = BinaryViewArray::from(binary_data); + + test_row_builder_basic( + &binary_view_array, + vec![ + Some(Variant::from(b"short".as_slice())), + None, + Some(Variant::from( + b"this is a longer binary view that exceeds inline storage".as_slice(), + )), + ], + ); + } + + #[test] + fn test_fixed_size_binary_row_builder() { + use arrow::array::FixedSizeBinaryArray; + + let binary_data = vec![ + Some([0x01, 0x02, 0x03, 0x04]), + None, + Some([0xFF, 0xFE, 0xFD, 0xFC]), + ]; + let fixed_binary_array = + FixedSizeBinaryArray::try_from_sparse_iter_with_size(binary_data.into_iter(), 4) + .unwrap(); + + test_row_builder_basic( + &fixed_binary_array, + vec![ + Some(Variant::from([0x01, 0x02, 0x03, 0x04].as_slice())), + None, + Some(Variant::from([0xFF, 0xFE, 0xFD, 0xFC].as_slice())), + ], + ); + } + + #[test] + fn test_utf8_view_row_builder() { + use arrow::array::StringViewArray; + + let string_data = vec![ + Some("short"), + None, + Some("this is a much longer string that will be stored out-of-line in the buffer"), + ]; + let string_view_array = StringViewArray::from(string_data); + + test_row_builder_basic( + &string_view_array, + vec![ + Some(Variant::from("short")), + None, + Some(Variant::from( + "this is a much longer string that will be stored out-of-line in the buffer", + )), + ], + ); + } + + #[test] + fn test_timestamp_second_row_builder() { + use arrow::array::TimestampSecondArray; + + let timestamp_data = vec![ + Some(1609459200), // 2021-01-01 00:00:00 UTC + None, + Some(1640995200), // 2022-01-01 00:00:00 UTC + ]; + let timestamp_array = TimestampSecondArray::from(timestamp_data); + + let expected_naive1 = DateTime::from_timestamp(1609459200, 0).unwrap().naive_utc(); + let expected_naive2 = DateTime::from_timestamp(1640995200, 0).unwrap().naive_utc(); + + test_row_builder_basic( + ×tamp_array, + vec![ + Some(Variant::from(expected_naive1)), + None, + Some(Variant::from(expected_naive2)), + ], + ); + } + + #[test] + fn test_timestamp_with_timezone_row_builder() { + use arrow::array::TimestampMicrosecondArray; + use chrono::DateTime; + + let timestamp_data = vec![ + Some(1609459200000000), // 2021-01-01 00:00:00 UTC (in microseconds) + None, + Some(1640995200000000), // 2022-01-01 00:00:00 UTC (in microseconds) + ]; + let timezone = "UTC".to_string(); + let timestamp_array = + TimestampMicrosecondArray::from(timestamp_data).with_timezone(timezone); + + let expected_utc1 = DateTime::from_timestamp(1609459200, 0).unwrap(); + let expected_utc2 = DateTime::from_timestamp(1640995200, 0).unwrap(); + + test_row_builder_basic( + ×tamp_array, + vec![ + Some(Variant::from(expected_utc1)), + None, + Some(Variant::from(expected_utc2)), + ], + ); + } + + #[test] + fn test_timestamp_nanosecond_precision_row_builder() { + use arrow::array::TimestampNanosecondArray; + + let timestamp_data = vec![ + Some(1609459200123456789), // 2021-01-01 00:00:00.123456789 UTC + None, + Some(1609459200000000000), // 2021-01-01 00:00:00.000000000 UTC (no fractional seconds) + ]; + let timestamp_array = TimestampNanosecondArray::from(timestamp_data); + + let expected_with_nanos = DateTime::from_timestamp(1609459200, 123456789) + .unwrap() + .naive_utc(); + let expected_no_nanos = DateTime::from_timestamp(1609459200, 0).unwrap().naive_utc(); + + test_row_builder_basic( + ×tamp_array, + vec![ + Some(Variant::from(expected_with_nanos)), + None, + Some(Variant::from(expected_no_nanos)), + ], + ); + } + + #[test] + fn test_timestamp_millisecond_row_builder() { + use arrow::array::TimestampMillisecondArray; + + let timestamp_data = vec![ + Some(1609459200123), // 2021-01-01 00:00:00.123 UTC + None, + Some(1609459200000), // 2021-01-01 00:00:00.000 UTC + ]; + let timestamp_array = TimestampMillisecondArray::from(timestamp_data); + + let expected_with_millis = DateTime::from_timestamp(1609459200, 123000000) + .unwrap() + .naive_utc(); + let expected_no_millis = DateTime::from_timestamp(1609459200, 0).unwrap().naive_utc(); + + test_row_builder_basic( + ×tamp_array, + vec![ + Some(Variant::from(expected_with_millis)), + None, + Some(Variant::from(expected_no_millis)), + ], + ); + } + + #[test] + fn test_date32_row_builder() { + use arrow::array::Date32Array; + use chrono::NaiveDate; + + let date_data = vec![ + Some(0), // 1970-01-01 + None, + Some(19723), // 2024-01-01 (days since epoch) + Some(-719162), // 0001-01-01 (near minimum) + ]; + let date_array = Date32Array::from(date_data); + + let expected_epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let expected_2024 = NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(); + let expected_min = NaiveDate::from_ymd_opt(1, 1, 1).unwrap(); + + test_row_builder_basic( + &date_array, + vec![ + Some(Variant::from(expected_epoch)), + None, + Some(Variant::from(expected_2024)), + Some(Variant::from(expected_min)), + ], + ); + } + + #[test] + fn test_date64_row_builder() { + use arrow::array::Date64Array; + use chrono::NaiveDate; + + // Test Date64Array with various dates (milliseconds since epoch) + let date_data = vec![ + Some(0), // 1970-01-01 + None, + Some(1704067200000), // 2024-01-01 (milliseconds since epoch) + Some(86400000), // 1970-01-02 + ]; + let date_array = Date64Array::from(date_data); + + let expected_epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let expected_2024 = NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(); + let expected_next_day = NaiveDate::from_ymd_opt(1970, 1, 2).unwrap(); + + test_row_builder_basic( + &date_array, + vec![ + Some(Variant::from(expected_epoch)), + None, + Some(Variant::from(expected_2024)), + Some(Variant::from(expected_next_day)), + ], + ); + } + + #[test] + fn test_time32_second_row_builder() { + use arrow::array::Time32SecondArray; + use chrono::NaiveTime; + + // Test Time32SecondArray with various times (seconds since midnight) + let time_data = vec![ + Some(0), // 00:00:00 + None, + Some(3661), // 01:01:01 + Some(86399), // 23:59:59 + ]; + let time_array = Time32SecondArray::from(time_data); + + let expected_midnight = NaiveTime::from_hms_opt(0, 0, 0).unwrap(); + let expected_time = NaiveTime::from_hms_opt(1, 1, 1).unwrap(); + let expected_last = NaiveTime::from_hms_opt(23, 59, 59).unwrap(); + + test_row_builder_basic( + &time_array, + vec![ + Some(Variant::from(expected_midnight)), + None, + Some(Variant::from(expected_time)), + Some(Variant::from(expected_last)), + ], + ); + } + + #[test] + fn test_time32_millisecond_row_builder() { + use arrow::array::Time32MillisecondArray; + use chrono::NaiveTime; + + // Test Time32MillisecondArray with various times (milliseconds since midnight) + let time_data = vec![ + Some(0), // 00:00:00.000 + None, + Some(3661123), // 01:01:01.123 + Some(86399999), // 23:59:59.999 + ]; + let time_array = Time32MillisecondArray::from(time_data); + + let expected_midnight = NaiveTime::from_hms_milli_opt(0, 0, 0, 0).unwrap(); + let expected_time = NaiveTime::from_hms_milli_opt(1, 1, 1, 123).unwrap(); + let expected_last = NaiveTime::from_hms_milli_opt(23, 59, 59, 999).unwrap(); + + test_row_builder_basic( + &time_array, + vec![ + Some(Variant::from(expected_midnight)), + None, + Some(Variant::from(expected_time)), + Some(Variant::from(expected_last)), + ], + ); + } + + #[test] + fn test_time64_microsecond_row_builder() { + use arrow::array::Time64MicrosecondArray; + use chrono::NaiveTime; + + // Test Time64MicrosecondArray with various times (microseconds since midnight) + let time_data = vec![ + Some(0), // 00:00:00.000000 + None, + Some(3661123456), // 01:01:01.123456 + Some(86399999999), // 23:59:59.999999 + ]; + let time_array = Time64MicrosecondArray::from(time_data); + + let expected_midnight = NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap(); + let expected_time = NaiveTime::from_hms_micro_opt(1, 1, 1, 123456).unwrap(); + let expected_last = NaiveTime::from_hms_micro_opt(23, 59, 59, 999999).unwrap(); + + test_row_builder_basic( + &time_array, + vec![ + Some(Variant::from(expected_midnight)), + None, + Some(Variant::from(expected_time)), + Some(Variant::from(expected_last)), + ], + ); + } + + #[test] + fn test_time64_nanosecond_row_builder() { + use arrow::array::Time64NanosecondArray; + use chrono::NaiveTime; + + // Test Time64NanosecondArray with various times (nanoseconds since midnight) + let time_data = vec![ + Some(0), // 00:00:00.000000000 + None, + Some(3661123456789), // 01:01:01.123456789 + Some(86399999999999), // 23:59:59.999999999 + ]; + let time_array = Time64NanosecondArray::from(time_data); + + let expected_midnight = NaiveTime::from_hms_nano_opt(0, 0, 0, 0).unwrap(); + // Nanoseconds are truncated to microsecond precision in Variant + let expected_time = NaiveTime::from_hms_micro_opt(1, 1, 1, 123456).unwrap(); + let expected_last = NaiveTime::from_hms_micro_opt(23, 59, 59, 999999).unwrap(); + + test_row_builder_basic( + &time_array, + vec![ + Some(Variant::from(expected_midnight)), + None, + Some(Variant::from(expected_time)), + Some(Variant::from(expected_last)), + ], + ); + } +} diff --git a/parquet-variant-compute/src/cast_to_variant.rs b/parquet-variant-compute/src/cast_to_variant.rs index 231d36f96e82..3499470f5903 100644 --- a/parquet-variant-compute/src/cast_to_variant.rs +++ b/parquet-variant-compute/src/cast_to_variant.rs @@ -15,131 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; -use std::sync::Arc; - -use crate::type_conversion::{ - decimal_to_variant_decimal, generic_conversion_array, non_generic_conversion_array, - primitive_conversion_array, timestamp_to_variant_timestamp, -}; -use crate::{VariantArray, VariantArrayBuilder}; -use arrow::array::{ - Array, AsArray, OffsetSizeTrait, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, -}; -use arrow::buffer::{OffsetBuffer, ScalarBuffer}; -use arrow::compute::kernels::cast; -use arrow::datatypes::{ - i256, ArrowNativeType, BinaryType, BinaryViewType, Date32Type, Date64Type, Decimal128Type, - Decimal256Type, Decimal32Type, Decimal64Type, Float16Type, Float32Type, Float64Type, Int16Type, - Int32Type, Int64Type, Int8Type, LargeBinaryType, RunEndIndexType, Time32MillisecondType, - Time32SecondType, Time64MicrosecondType, Time64NanosecondType, UInt16Type, UInt32Type, - UInt64Type, UInt8Type, -}; -use arrow::temporal_conversions::{ - timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime, - timestamp_us_to_datetime, -}; -use arrow_schema::{ArrowError, DataType, FieldRef, TimeUnit, UnionFields}; -use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc}; -use parquet_variant::{ - Variant, VariantBuilder, VariantDecimal16, VariantDecimal4, VariantDecimal8, -}; - -/// Options for controlling the behavior of `cast_to_variant_with_options`. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct CastOptions { - /// If true, return error on conversion failure. If false, insert null for failed conversions. - pub strict: bool, -} - -impl Default for CastOptions { - fn default() -> Self { - Self { strict: true } - } -} - -fn convert_timestamp_with_options( - time_unit: &TimeUnit, - time_zone: &Option>, - input: &dyn Array, - builder: &mut VariantArrayBuilder, - options: &CastOptions, -) -> Result<(), ArrowError> { - let native_datetimes: Vec> = match time_unit { - arrow_schema::TimeUnit::Second => { - let ts_array = input - .as_any() - .downcast_ref::() - .expect("Array is not TimestampSecondArray"); - timestamp_to_variant_timestamp!( - ts_array, - timestamp_s_to_datetime, - "seconds", - options.strict - ) - } - arrow_schema::TimeUnit::Millisecond => { - let ts_array = input - .as_any() - .downcast_ref::() - .expect("Array is not TimestampMillisecondArray"); - timestamp_to_variant_timestamp!( - ts_array, - timestamp_ms_to_datetime, - "milliseconds", - options.strict - ) - } - arrow_schema::TimeUnit::Microsecond => { - let ts_array = input - .as_any() - .downcast_ref::() - .expect("Array is not TimestampMicrosecondArray"); - timestamp_to_variant_timestamp!( - ts_array, - timestamp_us_to_datetime, - "microseconds", - options.strict - ) - } - arrow_schema::TimeUnit::Nanosecond => { - let ts_array = input - .as_any() - .downcast_ref::() - .expect("Array is not TimestampNanosecondArray"); - timestamp_to_variant_timestamp!( - ts_array, - timestamp_ns_to_datetime, - "nanoseconds", - options.strict - ) - } - }; - - for (i, x) in native_datetimes.iter().enumerate() { - match x { - Some(ndt) => { - if time_zone.is_none() { - builder.append_variant((*ndt).into()); - } else { - let utc_dt: DateTime = Utc.from_utc_datetime(ndt); - builder.append_variant(utc_dt.into()); - } - } - None if options.strict && input.is_valid(i) => { - return Err(ArrowError::ComputeError(format!( - "Failed to convert timestamp at index {}: invalid timestamp value", - i - ))); - } - None => { - builder.append_null(); - } - } - } - Ok(()) -} +use crate::arrow_to_variant::make_arrow_to_variant_row_builder; +use crate::{CastOptions, VariantArray, VariantArrayBuilder}; +use arrow::array::Array; +use arrow_schema::ArrowError; /// Casts a typed arrow [`Array`] to a [`VariantArray`]. This is useful when you /// need to convert a specific data type @@ -178,489 +57,34 @@ pub fn cast_to_variant_with_options( input: &dyn Array, options: &CastOptions, ) -> Result { - let mut builder = VariantArrayBuilder::new(input.len()); - - let input_type = input.data_type(); - match input_type { - DataType::Null => { - for _ in 0..input.len() { - builder.append_null(); - } - } - DataType::Boolean => { - non_generic_conversion_array!(input.as_boolean(), |v| v, builder); - } - DataType::Int8 => { - primitive_conversion_array!(Int8Type, input, builder); - } - DataType::Int16 => { - primitive_conversion_array!(Int16Type, input, builder); - } - DataType::Int32 => { - primitive_conversion_array!(Int32Type, input, builder); - } - DataType::Int64 => { - primitive_conversion_array!(Int64Type, input, builder); - } - DataType::UInt8 => { - primitive_conversion_array!(UInt8Type, input, builder); - } - DataType::UInt16 => { - primitive_conversion_array!(UInt16Type, input, builder); - } - DataType::UInt32 => { - primitive_conversion_array!(UInt32Type, input, builder); - } - DataType::UInt64 => { - primitive_conversion_array!(UInt64Type, input, builder); - } - DataType::Float16 => { - generic_conversion_array!(Float16Type, as_primitive, f32::from, input, builder); - } - DataType::Float32 => { - primitive_conversion_array!(Float32Type, input, builder); - } - DataType::Float64 => { - primitive_conversion_array!(Float64Type, input, builder); - } - DataType::Decimal32(_, scale) => { - generic_conversion_array!( - Decimal32Type, - as_primitive, - |v| decimal_to_variant_decimal!(v, scale, i32, VariantDecimal4), - input, - builder - ); - } - DataType::Decimal64(_, scale) => { - generic_conversion_array!( - Decimal64Type, - as_primitive, - |v| decimal_to_variant_decimal!(v, scale, i64, VariantDecimal8), - input, - builder - ); - } - DataType::Decimal128(_, scale) => { - generic_conversion_array!( - Decimal128Type, - as_primitive, - |v| decimal_to_variant_decimal!(v, scale, i128, VariantDecimal16), - input, - builder - ); - } - DataType::Decimal256(_, scale) => { - generic_conversion_array!( - Decimal256Type, - as_primitive, - |v: i256| { - // Since `i128::MAX` is larger than the max value of `VariantDecimal16`, - // any `i256` value that cannot be cast to `i128` is unable to be cast to `VariantDecimal16` either. - // Therefore, we can safely convert `i256` to `i128` first and process it like `i128`. - if let Some(v) = v.to_i128() { - decimal_to_variant_decimal!(v, scale, i128, VariantDecimal16) - } else { - Variant::Null - } - }, - input, - builder - ); - } - DataType::Timestamp(time_unit, time_zone) => { - convert_timestamp_with_options(time_unit, time_zone, input, &mut builder, options)?; - } - DataType::Time32(unit) => { - match *unit { - TimeUnit::Second => { - generic_conversion_array!( - Time32SecondType, - as_primitive, - // nano second are always 0 - |v| NaiveTime::from_num_seconds_from_midnight_opt(v as u32, 0u32), - input, - builder, - options.strict - )?; - } - TimeUnit::Millisecond => { - generic_conversion_array!( - Time32MillisecondType, - as_primitive, - |v| NaiveTime::from_num_seconds_from_midnight_opt( - v as u32 / 1000, - (v as u32 % 1000) * 1_000_000 - ), - input, - builder, - options.strict - )?; - } - _ => { - return Err(ArrowError::CastError(format!( - "Unsupported Time32 unit: {:?}", - unit - ))); - } - }; - } - DataType::Time64(unit) => { - match *unit { - TimeUnit::Microsecond => { - generic_conversion_array!( - Time64MicrosecondType, - as_primitive, - |v| NaiveTime::from_num_seconds_from_midnight_opt( - (v / 1_000_000) as u32, - (v % 1_000_000 * 1_000) as u32 - ), - input, - builder, - options.strict - )?; - } - TimeUnit::Nanosecond => { - generic_conversion_array!( - Time64NanosecondType, - as_primitive, - |v| NaiveTime::from_num_seconds_from_midnight_opt( - (v / 1_000_000_000) as u32, - (v % 1_000_000_000) as u32 - ), - input, - builder, - options.strict - )?; - } - _ => { - return Err(ArrowError::CastError(format!( - "Unsupported Time64 unit: {:?}", - unit - ))); - } - }; - } - DataType::Duration(_) | DataType::Interval(_) => { - return Err(ArrowError::InvalidArgumentError( - "Casting duration/interval types to Variant is not supported. \ - The Variant format does not define duration/interval types." - .to_string(), - )); - } - DataType::Binary => { - generic_conversion_array!(BinaryType, as_bytes, |v| v, input, builder); - } - DataType::LargeBinary => { - generic_conversion_array!(LargeBinaryType, as_bytes, |v| v, input, builder); - } - DataType::BinaryView => { - generic_conversion_array!(BinaryViewType, as_byte_view, |v| v, input, builder); - } - DataType::FixedSizeBinary(_) => { - non_generic_conversion_array!(input.as_fixed_size_binary(), |v| v, builder); - } - DataType::Utf8 => { - generic_conversion_array!(i32, as_string, |v| v, input, builder); - } - DataType::LargeUtf8 => { - generic_conversion_array!(i64, as_string, |v| v, input, builder); - } - DataType::Utf8View => { - non_generic_conversion_array!(input.as_string_view(), |v| v, builder); - } - DataType::Date32 => { - generic_conversion_array!( - Date32Type, - as_primitive, - |v: i32| -> NaiveDate { Date32Type::to_naive_date(v) }, - input, - builder - ); - } - DataType::Date64 => { - generic_conversion_array!( - Date64Type, - as_primitive, - |v: i64| Date64Type::to_naive_date_opt(v), - input, - builder, - options.strict - )?; - } - DataType::List(_) => convert_list::(input, &mut builder)?, - DataType::LargeList(_) => convert_list::(input, &mut builder)?, - DataType::Struct(_) => convert_struct(input, &mut builder)?, - DataType::Map(field, _) => convert_map(field, input, &mut builder)?, - DataType::Union(fields, _) => convert_union(fields, input, &mut builder)?, - DataType::Dictionary(_, _) => convert_dictionary_encoded(input, &mut builder)?, - DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() { - DataType::Int16 => convert_run_end_encoded::(input, &mut builder)?, - DataType::Int32 => convert_run_end_encoded::(input, &mut builder)?, - DataType::Int64 => convert_run_end_encoded::(input, &mut builder)?, - _ => { - return Err(ArrowError::CastError(format!( - "Unsupported run ends type: {:?}", - run_ends.data_type() - ))); - } - }, - dt => { - return Err(ArrowError::CastError(format!( - "Unsupported data type for casting to Variant: {dt:?}", - ))); - } - }; - Ok(builder.build()) -} - -/// Generic function to convert list arrays (both List and LargeList) to variant arrays -fn convert_list( - input: &dyn Array, - builder: &mut VariantArrayBuilder, -) -> Result<(), ArrowError> { - let list_array = input.as_list::(); - let values = list_array.values(); - let offsets = list_array.offsets(); - - let first_offset = *offsets.first().expect("There should be an offset"); - let length = *offsets.last().expect("There should be an offset") - first_offset; - let sliced_values = values.slice(first_offset.as_usize(), length.as_usize()); - - let values_variant_array = cast_to_variant(sliced_values.as_ref())?; - let new_offsets = OffsetBuffer::new(ScalarBuffer::from_iter( - offsets.iter().map(|o| *o - first_offset), - )); - - for i in 0..list_array.len() { - if list_array.is_null(i) { - builder.append_null(); - continue; - } - - let start = new_offsets[i].as_usize(); - let end = new_offsets[i + 1].as_usize(); - - // Start building the inner VariantList - let mut variant_builder = VariantBuilder::new(); - let mut list_builder = variant_builder.new_list(); - - // Add all values from the slice - for j in start..end { - list_builder.append_value(values_variant_array.value(j)); - } - - list_builder.finish(); - - let (metadata, value) = variant_builder.finish(); - let variant = Variant::new(&metadata, &value); - builder.append_variant(variant) - } - - Ok(()) -} - -fn convert_struct(input: &dyn Array, builder: &mut VariantArrayBuilder) -> Result<(), ArrowError> { - let struct_array = input.as_struct(); - - // Pre-convert all field arrays once for better performance - // This avoids converting the same field array multiple times - // Alternative approach: Use slicing per row: field_array.slice(i, 1) - // However, pre-conversion is more efficient for typical use cases - let field_variant_arrays: Result, _> = struct_array - .columns() - .iter() - .map(|field_array| cast_to_variant(field_array.as_ref())) - .collect(); - let field_variant_arrays = field_variant_arrays?; - - // Cache column names to avoid repeated calls - let column_names = struct_array.column_names(); - - for i in 0..struct_array.len() { - if struct_array.is_null(i) { - builder.append_null(); - continue; - } - - // Create a VariantBuilder for this struct instance - let mut variant_builder = VariantBuilder::new(); - let mut object_builder = variant_builder.new_object(); - - // Iterate through all fields in the struct - for (field_idx, field_name) in column_names.iter().enumerate() { - // Use pre-converted field variant arrays for better performance - // Check nulls directly from the pre-converted arrays instead of accessing column again - if !field_variant_arrays[field_idx].is_null(i) { - let field_variant = field_variant_arrays[field_idx].value(i); - object_builder.insert(field_name, field_variant); - } - // Note: we skip null fields rather than inserting Variant::Null - // to match Arrow struct semantics where null fields are omitted - } - - object_builder.finish(); - let (metadata, value) = variant_builder.finish(); - let variant = Variant::try_new(&metadata, &value)?; - builder.append_variant(variant); - } - - Ok(()) -} - -fn convert_map( - field: &FieldRef, - input: &dyn Array, - builder: &mut VariantArrayBuilder, -) -> Result<(), ArrowError> { - match field.data_type() { - DataType::Struct(_) => { - let map_array = input.as_map(); - let keys = cast(map_array.keys(), &DataType::Utf8)?; - let key_strings = keys.as_string::(); - let values = cast_to_variant(map_array.values())?; - let offsets = map_array.offsets(); - - let mut start_offset = offsets[0]; - for end_offset in offsets.iter().skip(1) { - if start_offset >= *end_offset { - builder.append_null(); - continue; - } - - let length = (end_offset - start_offset) as usize; - - let mut variant_builder = VariantBuilder::new(); - let mut object_builder = variant_builder.new_object(); - - for i in start_offset..*end_offset { - let value = values.value(i as usize); - object_builder.insert(key_strings.value(i as usize), value); - } - object_builder.finish(); - let (metadata, value) = variant_builder.finish(); - let variant = Variant::try_new(&metadata, &value)?; + // Create row builder for the input array type + let mut row_builder = make_arrow_to_variant_row_builder(input.data_type(), input, options)?; - builder.append_variant(variant); + // Create output array builder + let mut array_builder = VariantArrayBuilder::new(input.len()); - start_offset += length as i32; - } - } - _ => { - return Err(ArrowError::CastError(format!( - "Unsupported map field type for casting to Variant: {field:?}", - ))); - } + // Process each row using the row builder + for i in 0..input.len() { + let mut builder = array_builder.variant_builder(); + row_builder.append_row(&mut builder, i)?; + builder.finish(); } - Ok(()) + Ok(array_builder.build()) } -/// Convert an array to a `VariantArray` with strict mode enabled (returns errors on conversion failures). +/// Convert an array to a [`VariantArray`] with strict mode enabled (returns errors on conversion +/// failures). /// /// This function provides backward compatibility. For non-strict behavior, -/// use `cast_to_variant_with_options` with `CastOptions { strict: false }`. +/// use [`cast_to_variant_with_options`] with `CastOptions { strict: false }`. pub fn cast_to_variant(input: &dyn Array) -> Result { cast_to_variant_with_options(input, &CastOptions::default()) } -/// Convert union arrays -fn convert_union( - fields: &UnionFields, - input: &dyn Array, - builder: &mut VariantArrayBuilder, -) -> Result<(), ArrowError> { - let union_array = input.as_union(); - - // Convert each child array to variant arrays - let mut child_variant_arrays = HashMap::new(); - for (type_id, _) in fields.iter() { - let child_array = union_array.child(type_id); - let child_variant_array = cast_to_variant(child_array.as_ref())?; - child_variant_arrays.insert(type_id, child_variant_array); - } - - // Process each element in the union array - for i in 0..union_array.len() { - let type_id = union_array.type_id(i); - let value_offset = union_array.value_offset(i); - - if let Some(child_variant_array) = child_variant_arrays.get(&type_id) { - if child_variant_array.is_null(value_offset) { - builder.append_null(); - } else { - let value = child_variant_array.value(value_offset); - builder.append_variant(value); - } - } else { - // This should not happen in a valid union, but handle gracefully - builder.append_null(); - } - } - - Ok(()) -} - -fn convert_dictionary_encoded( - input: &dyn Array, - builder: &mut VariantArrayBuilder, -) -> Result<(), ArrowError> { - let dict_array = input.as_any_dictionary(); - let values_variant_array = cast_to_variant(dict_array.values().as_ref())?; - let normalized_keys = dict_array.normalized_keys(); - let keys = dict_array.keys(); - - for (i, key_idx) in normalized_keys.iter().enumerate() { - if keys.is_null(i) { - builder.append_null(); - continue; - } - - if values_variant_array.is_null(*key_idx) { - builder.append_null(); - continue; - } - - let value = values_variant_array.value(*key_idx); - builder.append_variant(value); - } - - Ok(()) -} - -fn convert_run_end_encoded( - input: &dyn Array, - builder: &mut VariantArrayBuilder, -) -> Result<(), ArrowError> { - let run_array = input.as_run::(); - let values_variant_array = cast_to_variant(run_array.values().as_ref())?; - - // Process runs in batches for better performance - let run_ends = run_array.run_ends().values(); - let mut logical_start = 0; - - for (physical_idx, &run_end) in run_ends.iter().enumerate() { - let logical_end = run_end.as_usize(); - let run_length = logical_end - logical_start; - - if values_variant_array.is_null(physical_idx) { - // Append nulls for the entire run - for _ in 0..run_length { - builder.append_null(); - } - } else { - // Get the value once and append it for the entire run - let value = values_variant_array.value(physical_idx); - for _ in 0..run_length { - builder.append_variant(value.clone()); - } - } - - logical_start = logical_end; - } - - Ok(()) -} +// TODO do we need a cast_with_options to allow specifying conversion behavior, +// e.g. how to handle overflows, whether to convert to Variant::Null or return +// an error, etc. ? #[cfg(test)] mod tests { @@ -674,17 +98,24 @@ mod tests { IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeListArray, LargeStringArray, ListArray, MapArray, NullArray, StringArray, StringRunBuilder, StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray, - Time64MicrosecondArray, Time64NanosecondArray, TimestampSecondArray, UInt16Array, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, UnionArray, }; use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; - use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano}; + use arrow::datatypes::{ + i256, BinaryType, BinaryViewType, Date32Type, Date64Type, Int32Type, Int64Type, Int8Type, + IntervalDayTime, IntervalMonthDayNano, LargeBinaryType, + }; use arrow_schema::{DataType, Field, Fields, UnionFields}; use arrow_schema::{ DECIMAL128_MAX_PRECISION, DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION, }; + use chrono::{DateTime, NaiveDate, NaiveTime}; use half::f16; - use parquet_variant::{Variant, VariantDecimal16}; + use parquet_variant::{ + Variant, VariantBuilder, VariantDecimal16, VariantDecimal4, VariantDecimal8, + }; use std::{sync::Arc, vec}; macro_rules! max_unscaled_value { @@ -2139,33 +1570,64 @@ mod tests { } #[test] - fn test_cast_to_variant_map_with_nulls() { - let keys = vec!["key1", "key2", "key3"]; - let values_data = Int32Array::from(vec![1, 2, 3]); - let entry_offsets = vec![0, 1, 1, 3]; - let map_array = - MapArray::new_from_strings(keys.clone().into_iter(), &values_data, &entry_offsets) - .unwrap(); + fn test_cast_to_variant_map_with_nulls_and_empty() { + use arrow::array::{Int32Array, MapArray, StringArray, StructArray}; + use arrow::buffer::{NullBuffer, OffsetBuffer}; + use arrow::datatypes::{DataType, Field, Fields}; + use std::sync::Arc; + + // Create entries struct array + let keys = StringArray::from(vec!["key1", "key2", "key3"]); + let values = Int32Array::from(vec![1, 2, 3]); + let entries_fields = Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, true), + ]); + let entries = StructArray::new( + entries_fields.clone(), + vec![Arc::new(keys), Arc::new(values)], + None, + ); + + // Create offsets for 4 maps: [0..1], [1..1], [1..1], [1..3] + let offsets = OffsetBuffer::new(vec![0, 1, 1, 1, 3].into()); + + // Create null buffer - map at index 2 is NULL + let null_buffer = Some(NullBuffer::from(vec![true, true, false, true])); + + let map_field = Arc::new(Field::new( + "entries", + DataType::Struct(entries_fields), + false, + )); + + let map_array = MapArray::try_new(map_field, offsets, entries, null_buffer, false).unwrap(); let result = cast_to_variant(&map_array).unwrap(); - // [{"key1":1}] - let variant1 = result.value(0); + + // Map 0: {"key1": 1} + let variant0 = result.value(0); assert_eq!( - variant1.as_object().unwrap().get("key1").unwrap(), + variant0.as_object().unwrap().get("key1").unwrap(), Variant::from(1) ); - // None - assert!(result.is_null(1)); + // Map 1: {} (empty, not null) + let variant1 = result.value(1); + let obj1 = variant1.as_object().unwrap(); + assert_eq!(obj1.len(), 0); // Empty object - // [{"key2":2},{"key3":3}] - let variant2 = result.value(2); + // Map 2: null (actual NULL) + assert!(result.is_null(2)); + + // Map 3: {"key2": 2, "key3": 3} + let variant3 = result.value(3); assert_eq!( - variant2.as_object().unwrap().get("key2").unwrap(), + variant3.as_object().unwrap().get("key2").unwrap(), Variant::from(2) ); assert_eq!( - variant2.as_object().unwrap().get("key3").unwrap(), + variant3.as_object().unwrap().get("key3").unwrap(), Variant::from(3) ); } @@ -2448,6 +1910,8 @@ mod tests { #[test] fn test_cast_to_variant_non_strict_mode_timestamp() { + use arrow::temporal_conversions::timestamp_s_to_datetime; + let ts_array = TimestampSecondArray::from(vec![Some(i64::MAX), Some(0), Some(1609459200)]) .with_timezone_opt(None::<&str>); diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index 3c928636ac34..e9a6e0c49f10 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -35,6 +35,7 @@ //! [`VariantPath`]: parquet_variant::VariantPath //! [Variant issue]: https://github.com/apache/arrow-rs/issues/6736 +mod arrow_to_variant; pub mod cast_to_variant; mod from_json; mod to_json; @@ -46,6 +47,7 @@ pub mod variant_get; pub use variant_array::{ShreddingState, VariantArray}; pub use variant_array_builder::{VariantArrayBuilder, VariantArrayVariantBuilder}; -pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options, CastOptions}; +pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options}; pub use from_json::json_to_variant; pub use to_json::variant_to_json; +pub use type_conversion::CastOptions; diff --git a/parquet-variant-compute/src/type_conversion.rs b/parquet-variant-compute/src/type_conversion.rs index aa60b425a18b..d2a63f49de16 100644 --- a/parquet-variant-compute/src/type_conversion.rs +++ b/parquet-variant-compute/src/type_conversion.rs @@ -17,46 +17,18 @@ //! Module for transforming a typed arrow `Array` to `VariantArray`. -/// Convert the input array to a `VariantArray` row by row, using `method` -/// not requiring a generic type to downcast the generic array to a specific -/// array type and `cast_fn` to transform each element to a type compatible with Variant -/// If `strict` is true(default), return error on conversion failure. If false, insert null. -macro_rules! non_generic_conversion_array { - ($array:expr, $cast_fn:expr, $builder:expr) => {{ - let array = $array; - for i in 0..array.len() { - if array.is_null(i) { - $builder.append_null(); - continue; - } - let cast_value = $cast_fn(array.value(i)); - $builder.append_variant(Variant::from(cast_value)); - } - }}; - ($array:expr, $cast_fn:expr, $builder:expr, $strict:expr) => {{ - let array = $array; - for i in 0..array.len() { - if array.is_null(i) { - $builder.append_null(); - continue; - } - match $cast_fn(array.value(i)) { - Some(cast_value) => { - $builder.append_variant(Variant::from(cast_value)); - } - None if $strict => { - return Err(ArrowError::ComputeError(format!( - "Failed to convert value at index {}: conversion failed", - i - ))); - } - None => $builder.append_null(), - } - } - Ok::<(), ArrowError>(()) - }}; +/// Options for controlling the behavior of `cast_to_variant_with_options`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CastOptions { + /// If true, return error on conversion failure. If false, insert null for failed conversions. + pub strict: bool, +} + +impl Default for CastOptions { + fn default() -> Self { + Self { strict: true } + } } -pub(crate) use non_generic_conversion_array; /// Convert the value at a specific index in the given array into a `Variant`. macro_rules! non_generic_conversion_single_value { @@ -72,29 +44,6 @@ macro_rules! non_generic_conversion_single_value { } pub(crate) use non_generic_conversion_single_value; -/// Convert the input array to a `VariantArray` row by row, using `method` -/// requiring a generic type to downcast the generic array to a specific -/// array type and `cast_fn` to transform each element to a type compatible with Variant -/// If `strict` is true(default), return error on conversion failure. If false, insert null. -macro_rules! generic_conversion_array { - ($t:ty, $method:ident, $cast_fn:expr, $input:expr, $builder:expr) => {{ - $crate::type_conversion::non_generic_conversion_array!( - $input.$method::<$t>(), - $cast_fn, - $builder - ) - }}; - ($t:ty, $method:ident, $cast_fn:expr, $input:expr, $builder:expr, $strict:expr) => {{ - $crate::type_conversion::non_generic_conversion_array!( - $input.$method::<$t>(), - $cast_fn, - $builder, - $strict - ) - }}; -} -pub(crate) use generic_conversion_array; - /// Convert the value at a specific index in the given array into a `Variant`, /// using `method` requiring a generic type to downcast the generic array /// to a specific array type and `cast_fn` to transform the element. @@ -109,21 +58,6 @@ macro_rules! generic_conversion_single_value { } pub(crate) use generic_conversion_single_value; -/// Convert the input array of a specific primitive type to a `VariantArray` -/// row by row -macro_rules! primitive_conversion_array { - ($t:ty, $input:expr, $builder:expr) => {{ - $crate::type_conversion::generic_conversion_array!( - $t, - as_primitive, - |v| v, - $input, - $builder - ) - }}; -} -pub(crate) use primitive_conversion_array; - /// Convert the value at a specific index in the given array into a `Variant`. macro_rules! primitive_conversion_single_value { ($t:ty, $input:expr, $index:expr) => {{ @@ -155,19 +89,3 @@ macro_rules! decimal_to_variant_decimal { }}; } pub(crate) use decimal_to_variant_decimal; - -/// Convert a timestamp value to a `VariantTimestamp` -macro_rules! timestamp_to_variant_timestamp { - ($ts_array:expr, $converter:expr, $unit_name:expr, $strict:expr) => { - if $strict { - let error = - || ArrowError::ComputeError(format!("Invalid timestamp {} value", $unit_name)); - let converter = |x| $converter(x).ok_or_else(error); - let iter = $ts_array.iter().map(|x| x.map(converter).transpose()); - iter.collect::, ArrowError>>()? - } else { - $ts_array.iter().map(|x| x.and_then($converter)).collect() - } - }; -} -pub(crate) use timestamp_to_variant_timestamp; diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index d5f578421ed3..aa3e1dbdfcfe 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -199,9 +199,14 @@ pub struct VariantArrayVariantBuilder<'a> { metadata_offsets: &'a mut Vec, value_offsets: &'a mut Vec, nulls: &'a mut NullBufferBuilder, + is_null: bool, } impl VariantBuilderExt for VariantArrayVariantBuilder<'_> { + /// Appending NULL to a variant array produces an actual NULL value + fn append_null(&mut self) { + self.is_null = true; + } fn append_value<'m, 'v>(&mut self, value: impl Into>) { ValueBuilder::append_variant(self.parent_state(), value.into()); } @@ -228,6 +233,7 @@ impl<'a> VariantArrayVariantBuilder<'a> { metadata_offsets: &mut builder.metadata_offsets, value_offsets: &mut builder.value_offsets, nulls: &mut builder.nulls, + is_null: false, } } @@ -239,10 +245,20 @@ impl<'a> VariantArrayVariantBuilder<'a> { pub fn finish(mut self) { // Record the ending offsets after finishing metadata and finish the parent state. let (value_builder, metadata_builder) = self.parent_state.value_and_metadata_builders(); - self.metadata_offsets.push(metadata_builder.finish()); - self.value_offsets.push(value_builder.offset()); - self.nulls.append_non_null(); - self.parent_state.finish(); + let (metadata_offset, value_offset, not_null) = if self.is_null { + // Do not `finish`, just repeat the previous offset for a physically empty result + let metadata_offset = self.metadata_offsets.last().copied().unwrap_or(0); + let value_offset = self.value_offsets.last().copied().unwrap_or(0); + (metadata_offset, value_offset, false) + } else { + let metadata_offset = metadata_builder.finish(); + let value_offset = value_builder.offset(); + self.parent_state.finish(); + (metadata_offset, value_offset, true) + }; + self.metadata_offsets.push(metadata_offset); + self.value_offsets.push(value_offset); + self.nulls.append(not_null); } fn parent_state(&mut self) -> ParentState<'_> { diff --git a/parquet-variant-json/src/from_json.rs b/parquet-variant-json/src/from_json.rs index 90b26f7d307b..3a6e869ec1fc 100644 --- a/parquet-variant-json/src/from_json.rs +++ b/parquet-variant-json/src/from_json.rs @@ -18,7 +18,7 @@ //! Module for parsing JSON strings as Variant use arrow_schema::ArrowError; -use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilderExt}; +use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt}; use serde_json::{Number, Value}; /// Converts a JSON string to Variant using a [`VariantBuilderExt`], such as @@ -120,10 +120,7 @@ fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), Value::Object(obj) => { let mut obj_builder = builder.try_new_object()?; for (key, value) in obj.iter() { - let mut field_builder = ObjectFieldBuilder { - key, - builder: &mut obj_builder, - }; + let mut field_builder = ObjectFieldBuilder::new(key, &mut obj_builder); append_json(value, &mut field_builder)?; } obj_builder.finish(); @@ -132,25 +129,6 @@ fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), Ok(()) } -struct ObjectFieldBuilder<'o, 'v, 's> { - key: &'s str, - builder: &'o mut ObjectBuilder<'v>, -} - -impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> { - fn append_value<'m, 'v>(&mut self, value: impl Into>) { - self.builder.insert(self.key, value); - } - - fn try_new_list(&mut self) -> Result, ArrowError> { - self.builder.try_new_list(self.key) - } - - fn try_new_object(&mut self) -> Result, ArrowError> { - self.builder.try_new_object(self.key) - } -} - #[cfg(test)] mod test { use super::*; diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 2fa8d0981c5b..a7eb2467988a 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -1706,6 +1706,10 @@ impl<'a> ObjectBuilder<'a> { /// Allows users to append values to a [`VariantBuilder`], [`ListBuilder`] or /// [`ObjectBuilder`]. using the same interface. pub trait VariantBuilderExt { + /// Appends a NULL value to this builder. The semantics depend on the implementation, but will + /// often translate to appending a [`Variant::Null`] value. + fn append_null(&mut self); + /// Appends a new variant value to this builder. See e.g. [`VariantBuilder::append_value`]. fn append_value<'m, 'v>(&mut self, value: impl Into>); @@ -1731,6 +1735,10 @@ pub trait VariantBuilderExt { } impl VariantBuilderExt for ListBuilder<'_> { + /// Variant arrays cannot encode NULL values, only `Variant::Null`. + fn append_null(&mut self) { + self.append_value(Variant::Null); + } fn append_value<'m, 'v>(&mut self, value: impl Into>) { self.append_value(value); } @@ -1745,6 +1753,11 @@ impl VariantBuilderExt for ListBuilder<'_> { } impl VariantBuilderExt for VariantBuilder { + /// Variant values cannot encode NULL, only [`Variant::Null`]. This is different from the column + /// that holds variant values being NULL at some positions. + fn append_null(&mut self) { + self.append_value(Variant::Null); + } fn append_value<'m, 'v>(&mut self, value: impl Into>) { self.append_value(value); } @@ -1758,6 +1771,34 @@ impl VariantBuilderExt for VariantBuilder { } } +/// A [`VariantBuilderExt`] that inserts a new field into a variant object. +pub struct ObjectFieldBuilder<'o, 'v, 's> { + key: &'s str, + builder: &'o mut ObjectBuilder<'v>, +} + +impl<'o, 'v, 's> ObjectFieldBuilder<'o, 'v, 's> { + pub fn new(key: &'s str, builder: &'o mut ObjectBuilder<'v>) -> Self { + Self { key, builder } + } +} + +impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> { + /// A NULL object field is interpreted as missing, so nothing gets inserted at all. + fn append_null(&mut self) {} + fn append_value<'m, 'v>(&mut self, value: impl Into>) { + self.builder.insert(self.key, value); + } + + fn try_new_list(&mut self) -> Result, ArrowError> { + self.builder.try_new_list(self.key) + } + + fn try_new_object(&mut self) -> Result, ArrowError> { + self.builder.try_new_object(self.key) + } +} + #[cfg(test)] mod tests { use crate::VariantMetadata;