Skip to content

Commit 118ee3f

Browse files
Validate schema
1 parent fe77f2f commit 118ee3f

File tree

5 files changed

+399
-37
lines changed

5 files changed

+399
-37
lines changed

parquet-variant-compute/src/from_json.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
2121
use crate::{VariantArray, VariantArrayBuilder};
2222
use arrow::array::{Array, ArrayRef, StringArray};
23-
use arrow_schema::ArrowError;
23+
use arrow_schema::{ArrowError, DataType, Field, Fields};
2424
use parquet_variant::VariantBuilder;
2525
use parquet_variant_json::json_to_variant;
2626

@@ -35,7 +35,14 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<VariantArray, Ar
3535
)),
3636
}?;
3737

38-
let mut variant_array_builder = VariantArrayBuilder::new(input_string_array.len());
38+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
39+
let value_field = Field::new("value", DataType::BinaryView, false);
40+
41+
let schema = Fields::from(vec![metadata_field, value_field]);
42+
43+
let mut variant_array_builder =
44+
VariantArrayBuilder::try_new(input_string_array.len(), schema).unwrap();
45+
3946
for i in 0..input.len() {
4047
if input.is_null(i) {
4148
// The subfields are expected to be non-nullable according to the parquet variant spec.

parquet-variant-compute/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
mod from_json;
19+
mod shredding;
1920
mod to_json;
2021
mod variant_array;
2122
mod variant_array_builder;
Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_schema::{ArrowError, DataType, Fields};
19+
20+
// Keywords defined by the shredding spec
21+
pub const METADATA: &str = "metadata";
22+
pub const VALUE: &str = "value";
23+
pub const TYPED_VALUE: &str = "typed_value";
24+
25+
pub fn validate_value_and_typed_value(
26+
fields: &Fields,
27+
allow_both_null: bool,
28+
) -> Result<(), ArrowError> {
29+
let value_field_res = fields.iter().find(|f| f.name() == VALUE);
30+
let typed_value_field_res = fields.iter().find(|f| f.name() == TYPED_VALUE);
31+
32+
if !allow_both_null {
33+
if let (None, None) = (value_field_res, typed_value_field_res) {
34+
return Err(ArrowError::InvalidArgumentError(
35+
"Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both.".to_string()
36+
));
37+
}
38+
}
39+
40+
if let Some(value_field) = value_field_res {
41+
// if !value_field.is_nullable() {
42+
// return Err(ArrowError::InvalidArgumentError(
43+
// "Expected value field to be nullable".to_string(),
44+
// ));
45+
// }
46+
47+
if value_field.data_type() != &DataType::BinaryView {
48+
return Err(ArrowError::NotYetImplemented(format!(
49+
"VariantArray 'value' field must be BinaryView, got {}",
50+
value_field.data_type()
51+
)));
52+
}
53+
}
54+
55+
if let Some(typed_value_field) = fields.iter().find(|f| f.name() == TYPED_VALUE) {
56+
// if !typed_value_field.is_nullable() {
57+
// return Err(ArrowError::InvalidArgumentError(
58+
// "Expected value field to be nullable".to_string(),
59+
// ));
60+
// }
61+
62+
// this is directly mapped from the spec's parquet physical types
63+
// note, there are more data types we can support
64+
// but for the sake of simplicity, I chose the smallest subset
65+
match typed_value_field.data_type() {
66+
DataType::Boolean
67+
| DataType::Int32
68+
| DataType::Int64
69+
| DataType::Float32
70+
| DataType::Float64
71+
| DataType::BinaryView => {}
72+
DataType::Union(union_fields, _) => {
73+
union_fields
74+
.iter()
75+
.map(|(_, f)| f.clone())
76+
.try_for_each(|f| {
77+
let DataType::Struct(fields) = f.data_type().clone() else {
78+
return Err(ArrowError::InvalidArgumentError(
79+
"Expected struct".to_string(),
80+
));
81+
};
82+
83+
validate_value_and_typed_value(&fields, false)
84+
})?;
85+
}
86+
DataType::Dictionary(key, value) => {
87+
if key.as_ref() != &DataType::Utf8View {
88+
return Err(ArrowError::NotYetImplemented(format!(
89+
"Unsupported type. Expected dictionary key to be Utf8View, got {key}"
90+
)));
91+
}
92+
93+
if let DataType::Struct(fields) = value.as_ref() {
94+
validate_value_and_typed_value(fields, true)?;
95+
} else {
96+
return Err(ArrowError::NotYetImplemented(format!(
97+
"Unsupported type. Expected dictionary values to be Utf8View, got {value}"
98+
)));
99+
}
100+
}
101+
DataType::Struct(fields) => validate_value_and_typed_value(fields, false)?, // the ide
102+
foreign => {
103+
return Err(ArrowError::NotYetImplemented(format!(
104+
"Unsupported VariantArray 'typed_value' field, got {foreign}"
105+
)))
106+
}
107+
}
108+
}
109+
110+
Ok(())
111+
}
112+
113+
/// Validates that the provided [`Fields`] conform to the Variant shredding specification.
114+
///
115+
/// # Requirements
116+
/// - Must contain a "metadata" field of type BinaryView
117+
/// - Must contain at least one of "value" (optional BinaryView) or "typed_value" (optional with valid Parquet type)
118+
/// - Both "value" and "typed_value" can only be null simultaneously for shredded object fields
119+
pub fn validate_shredded_schema(fields: &Fields) -> Result<(), ArrowError> {
120+
let metadata_field = fields
121+
.iter()
122+
.find(|f| f.name() == METADATA)
123+
.ok_or_else(|| {
124+
ArrowError::InvalidArgumentError(
125+
"Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(),
126+
)
127+
})?;
128+
129+
if metadata_field.is_nullable() {
130+
return Err(ArrowError::InvalidArgumentError(
131+
"Invalid VariantArray: metadata field can not be nullable".to_string(),
132+
));
133+
}
134+
135+
if metadata_field.data_type() != &DataType::BinaryView {
136+
return Err(ArrowError::NotYetImplemented(format!(
137+
"VariantArray 'metadata' field must be BinaryView, got {}",
138+
metadata_field.data_type()
139+
)));
140+
}
141+
142+
validate_value_and_typed_value(fields, false)?;
143+
144+
Ok(())
145+
}
146+
147+
#[cfg(test)]
148+
mod tests {
149+
use super::*;
150+
151+
use arrow_schema::{Field, UnionFields, UnionMode};
152+
153+
#[test]
154+
fn test_regular_variant_schema() {
155+
// a regular variant schema
156+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
157+
let value_field = Field::new("value", DataType::BinaryView, false);
158+
159+
let schema = Fields::from(vec![metadata_field, value_field]);
160+
161+
validate_shredded_schema(&schema).unwrap();
162+
}
163+
164+
#[test]
165+
fn test_regular_variant_schema_order_agnostic() {
166+
// a regular variant schema
167+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
168+
let value_field = Field::new("value", DataType::BinaryView, false);
169+
170+
let schema = Fields::from(vec![value_field, metadata_field]); // note the order switch
171+
172+
validate_shredded_schema(&schema).unwrap();
173+
}
174+
175+
#[test]
176+
fn test_regular_variant_schema_allow_other_columns() {
177+
// a regular variant schema
178+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
179+
let value_field = Field::new("value", DataType::BinaryView, false);
180+
181+
let trace_field = Field::new("trace_id", DataType::Utf8View, false);
182+
let created_at_field = Field::new("created_at", DataType::Date64, false);
183+
184+
let schema = Fields::from(vec![
185+
metadata_field,
186+
trace_field,
187+
created_at_field,
188+
value_field,
189+
]);
190+
191+
validate_shredded_schema(&schema).unwrap();
192+
}
193+
194+
#[test]
195+
fn test_regular_variant_schema_missing_metadata() {
196+
let value_field = Field::new("value", DataType::BinaryView, false);
197+
let schema = Fields::from(vec![value_field]);
198+
199+
let err = validate_shredded_schema(&schema).unwrap_err();
200+
201+
assert_eq!(
202+
err.to_string(),
203+
"Invalid argument error: Invalid VariantArray: StructArray must contain a 'metadata' field"
204+
);
205+
}
206+
207+
#[test]
208+
fn test_regular_variant_schema_nullable_metadata() {
209+
let metadata_field = Field::new("metadata", DataType::BinaryView, true);
210+
let value_field = Field::new("value", DataType::BinaryView, false);
211+
212+
let schema = Fields::from(vec![metadata_field, value_field]);
213+
214+
let err = validate_shredded_schema(&schema).unwrap_err();
215+
216+
assert_eq!(
217+
err.to_string(),
218+
"Invalid argument error: Invalid VariantArray: metadata field can not be nullable"
219+
);
220+
}
221+
222+
#[test]
223+
fn test_regular_variant_schema_allow_nullable_value() {
224+
// a regular variant schema
225+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
226+
let value_field = Field::new("value", DataType::BinaryView, true);
227+
228+
let schema = Fields::from(vec![metadata_field, value_field]);
229+
230+
validate_shredded_schema(&schema).unwrap();
231+
}
232+
233+
#[test]
234+
fn test_shredded_variant_schema() {
235+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
236+
let typed_value_field = Field::new("typed_value", DataType::Int64, false);
237+
let schema = Fields::from(vec![metadata_field, typed_value_field]);
238+
239+
validate_shredded_schema(&schema).unwrap();
240+
}
241+
242+
#[test]
243+
fn test_partially_shredded_variant_schema() {
244+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
245+
let value_field = Field::new("value", DataType::BinaryView, false);
246+
let typed_value_field = Field::new("typed_value", DataType::Int64, false);
247+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
248+
249+
validate_shredded_schema(&schema).unwrap();
250+
}
251+
252+
#[test]
253+
fn test_partially_shredded_variant_list_schema() {
254+
/*
255+
optional group tags (VARIANT) {
256+
required binary metadata;
257+
optional binary value;
258+
optional group typed_value (LIST) { # must be optional to allow a null list
259+
repeated group list {
260+
required group element { # shredded element
261+
optional binary value;
262+
optional binary typed_value (STRING);
263+
}
264+
required group element { # shredded element
265+
optional binary value;
266+
optional int64 typed_value ;
267+
}
268+
}
269+
}
270+
}
271+
*/
272+
273+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
274+
let value_field = Field::new("value", DataType::BinaryView, false);
275+
276+
// Define union fields for different element types
277+
let string_element = {
278+
let value_field = Field::new("value", DataType::BinaryView, true);
279+
let typed_value = Field::new("typed_value", DataType::BinaryView, true);
280+
DataType::Struct(Fields::from(vec![value_field, typed_value]))
281+
};
282+
283+
let int_element = {
284+
let value_field = Field::new("value", DataType::BinaryView, true);
285+
let typed_value = Field::new("typed_value", DataType::Int64, true);
286+
DataType::Struct(Fields::from(vec![value_field, typed_value]))
287+
};
288+
289+
// Create union of different element types
290+
let union_fields = UnionFields::new(
291+
vec![0, 1],
292+
vec![
293+
Field::new("string_element", string_element, true),
294+
Field::new("int_element", int_element, true),
295+
],
296+
);
297+
298+
let typed_value_field = Field::new(
299+
"typed_value",
300+
DataType::Union(union_fields, UnionMode::Sparse),
301+
false,
302+
);
303+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
304+
305+
validate_shredded_schema(&schema).unwrap();
306+
}
307+
308+
#[test]
309+
fn test_partially_shredded_variant_object_schema() {
310+
/*
311+
optional group event (VARIANT) {
312+
required binary metadata;
313+
optional binary value; # a variant, expected to be an object
314+
optional group typed_value { # shredded fields for the variant object
315+
required group event_type { # shredded field for event_type
316+
optional binary value;
317+
optional binary typed_value (STRING);
318+
}
319+
required group event_ts { # shredded field for event_ts
320+
optional binary value;
321+
optional int64 typed_value (TIMESTAMP(true, MICROS));
322+
}
323+
}
324+
}
325+
*/
326+
327+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
328+
let value_field = Field::new("value", DataType::BinaryView, false);
329+
330+
// event_type
331+
let element_group_1 = {
332+
let value_field = Field::new("value", DataType::BinaryView, false);
333+
let typed_value = Field::new("typed_value", DataType::BinaryView, false); // this is the string case
334+
335+
Fields::from(vec![value_field, typed_value])
336+
};
337+
338+
// event_ts
339+
let element_group_2 = {
340+
let value_field = Field::new("value", DataType::BinaryView, false);
341+
let typed_value = Field::new("typed_value", DataType::Int64, false);
342+
343+
Fields::from(vec![value_field, typed_value])
344+
};
345+
346+
let typed_value_field = Field::new(
347+
"typed_value",
348+
DataType::Union(
349+
UnionFields::new(
350+
vec![0, 1],
351+
vec![
352+
Field::new("event_type", DataType::Struct(element_group_1), true),
353+
Field::new("event_ts", DataType::Struct(element_group_2), true),
354+
],
355+
),
356+
UnionMode::Sparse,
357+
),
358+
false,
359+
);
360+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
361+
362+
validate_shredded_schema(&schema).unwrap();
363+
}
364+
}

0 commit comments

Comments
 (0)