-
Notifications
You must be signed in to change notification settings - Fork 1k
Add arrow-avro Reader support for Dense Union and Union resolution (Part 1) #8348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add arrow-avro Reader support for Dense Union and Union resolution (Part 1) #8348
Conversation
1331619
to
4c1cbcd
Compare
nullable_union_variants(reader_variants), | ||
) { | ||
(Some((_, write_nonnull)), Some((read_nb, read_nonnull))) => { | ||
(Some((write_nb, write_nonnull)), Some((_read_nb, read_nonnull))) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized while regression testing Unions that it made more sense and resulted in less complexity in Union decoding to use the writer's null ordering. This in itself also does not deviate from the Avro specification.
4c1cbcd
to
db0adee
Compare
impl Codec { | ||
/// Converts a string codec to use Utf8View if requested | ||
/// | ||
/// The conversion only happens if both: | ||
/// 1. `use_utf8view` is true | ||
/// 2. The codec is currently `Utf8` | ||
/// | ||
/// # Example | ||
/// ``` | ||
/// # use arrow_avro::codec::Codec; | ||
/// let utf8_codec1 = Codec::Utf8; | ||
/// let utf8_codec2 = Codec::Utf8; | ||
/// | ||
/// // Convert to Utf8View | ||
/// let view_codec = utf8_codec1.with_utf8view(true); | ||
/// assert!(matches!(view_codec, Codec::Utf8View)); | ||
/// | ||
/// // Don't convert if use_utf8view is false | ||
/// let unchanged_codec = utf8_codec2.with_utf8view(false); | ||
/// assert!(matches!(unchanged_codec, Codec::Utf8)); | ||
/// ``` | ||
pub fn with_utf8view(self, use_utf8view: bool) -> Self { | ||
if use_utf8view && matches!(self, Self::Utf8) { | ||
Self::Utf8View | ||
} else { | ||
self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just moved this code to the other Codec
implementation block to consolidate.
…s in Avro union validation - Expanded `arrow-avro` to support resolving Arrow unions to Avro unions including validation against duplicate branch types, nullability handling, and prohibition of nested unions. - Refactored `Codec`, schema resolution, and added new utility functions for union branch management. - Introduced dense union field mapping, Arrow metadata persistence, and Avro union branch validation. - Extended tests to cover union scenarios including dense mode, metadata persistence, and type promotion.
db0adee
to
2d3d4e2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great to me, just one question on the inlining from previous work.
arrow-avro/src/codec.rs
Outdated
} | ||
} | ||
|
||
#[inline] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we omitting these optimizations until we can prove them out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nathaniel-d-ef Thats fair enough. I took out the #[inline]
.
894e497
to
cb1c7e0
Compare
@alamb Let me know if you have time to get to this one. After this there's just one more to go. 😃 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jecsand838 -- as always I found the PR easy to read and understand, though given my limited avro knowledge I will not be able to pick up subtle avro issues.
The only thing I didn't see in this PR was "end to end" tests -- namely reading union data into arrow UnionArrays
Ideally we would also have round trip tests where we wrote a UnionArray
to avro and the read it back again ensuring the resulting array was the same
@alamb Absolutely! So I have the end to end tests in the part 2 PR #8349 . This PR only covers the codec and schema changes. The part 2 PR has the decoder updates and end to end tests along with a new test file containing all possible Union type scenarios. I just couldn't find a cleaner way to break the work up. Also the roundtrip tests will be in @nathaniel-d-ef 's upcoming PR for Let me know if you're okay with this breakdown. |
Sounds good to me 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops... my review took too long...
Codec::Union(encodings, _, _) if !encodings.is_empty() | ||
&& matches!(encodings[0].codec(), Codec::Null) => | ||
{ | ||
Ok(AvroLiteral::Null) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: that is some funky formatting, but I guess it's what fmt
produced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, but that's what came out of fmt
.
if encodings.is_empty() { | ||
return Err(ArrowError::SchemaError( | ||
"Union with no branches cannot have a default".to_string(), | ||
)); | ||
} | ||
encodings[0].parse_default_literal(default_json)? | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
if encodings.is_empty() { | |
return Err(ArrowError::SchemaError( | |
"Union with no branches cannot have a default".to_string(), | |
)); | |
} | |
encodings[0].parse_default_literal(default_json)? | |
} | |
let Some(default_encoding) = encodings.first() else { | |
return Err(ArrowError::SchemaError( | |
"Union with no branches cannot have a default".to_string(), | |
)); | |
}; | |
default_encoding.parse_default_literal(default_json)? | |
} |
Schema::Complex(ComplexType::Record(r)) => { | ||
let (full, _) = make_full_name(r.name, r.namespace, enclosing_ns); | ||
Some(UnionBranchKey::Named(full)) | ||
} | ||
Schema::Complex(ComplexType::Enum(e)) => { | ||
let (full, _) = make_full_name(e.name, e.namespace, enclosing_ns); | ||
Some(UnionBranchKey::Named(full)) | ||
} | ||
Schema::Complex(ComplexType::Fixed(f)) => { | ||
let (full, _) = make_full_name(f.name, f.namespace, enclosing_ns); | ||
Some(UnionBranchKey::Named(full)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq: Would it be cleaner -- or not -- to rearrange this match as follows?
let (name, namespace) = match s {
Schema::TypeName(TypeName::Primitive(p))
| Schema::Type(Type {
r#type: TypeName::Primitive(p),
..
}) => return Some(UnionBranchKey::Primitive(*p)),
Schema::TypeName(TypeName::Ref(name))
| Schema::Type(Type {
r#type: TypeName::Ref(name),
..
}) => (name, None),
Schema::Complex(ComplexType::Array(_)) => return Some(UnionBranchKey::Array),
Schema::Complex(ComplexType::Map(_)) => return Some(UnionBranchKey::Map),
Schema::Complex(ComplexType::Record(r)) => (r.name, r.namespace),
Schema::Complex(ComplexType::Enum(e)) => (e.name, e.namespace),
Schema::Complex(ComplexType::Fixed(f)) => (f.name, f.namespace),
Schema::Union(_) => return None,
};
let (full, _) = make_full_name(name, namespace, enclosing_ns);
Some(UnionBranchKey::Named(full))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's much cleaner. Ty for that suggestion.
branches: &'a [Schema<'a>], | ||
enclosing_ns: Option<&'a str>, | ||
) -> Option<String> { | ||
let mut seen: HashSet<UnionBranchKey> = HashSet::with_capacity(branches.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: type annotation shouldn't be necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100%, I'm removing that.
match ( | ||
nullable_union_variants(writer_variants.as_slice()), | ||
nullable_union_variants(reader_variants.as_slice()), | ||
) { | ||
(Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => { | ||
let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?; | ||
dt.nullability = Some(w_nb); | ||
Ok(dt) | ||
} | ||
_ => self.resolve_unions( | ||
writer_variants.as_slice(), | ||
reader_variants.as_slice(), | ||
namespace, | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match ( | |
nullable_union_variants(writer_variants.as_slice()), | |
nullable_union_variants(reader_variants.as_slice()), | |
) { | |
(Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => { | |
let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?; | |
dt.nullability = Some(w_nb); | |
Ok(dt) | |
} | |
_ => self.resolve_unions( | |
writer_variants.as_slice(), | |
reader_variants.as_slice(), | |
namespace, | |
), | |
let writer_variants = writer_variants.as_slice(); | |
let reader_variants = reader_variants.as_slice(); | |
match ( | |
nullable_union_variants(writer_variants), | |
nullable_union_variants(reader_variants), | |
) { | |
(Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => { | |
let mut dt = self.make_data_type(w_nonnull, Some(r_nonnull), namespace)?; | |
dt.nullability = Some(w_nb); | |
Ok(dt) | |
} | |
_ => self.resolve_unions(writer_variants, reader_variants, namespace), |
let mut writer_to_reader: Vec<Option<(usize, Promotion)>> = | ||
Vec::with_capacity(writer_variants.len()); | ||
for writer in writer_variants { | ||
match self.resolve_type(writer, reader_non_union, namespace) { | ||
Ok(tmp) => writer_to_reader.push(Some((0usize, Self::coercion_from(&tmp)))), | ||
Err(_) => writer_to_reader.push(None), | ||
} | ||
} | ||
let mut dt = self.parse_type(reader_non_union, namespace)?; | ||
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion { | ||
writer_to_reader: Arc::from(writer_to_reader), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
let mut writer_to_reader: Vec<Option<(usize, Promotion)>> = | |
Vec::with_capacity(writer_variants.len()); | |
for writer in writer_variants { | |
match self.resolve_type(writer, reader_non_union, namespace) { | |
Ok(tmp) => writer_to_reader.push(Some((0usize, Self::coercion_from(&tmp)))), | |
Err(_) => writer_to_reader.push(None), | |
} | |
} | |
let mut dt = self.parse_type(reader_non_union, namespace)?; | |
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion { | |
writer_to_reader: Arc::from(writer_to_reader), | |
let writer_to_reader = writer_variants.iter().filter_map(|writer| { | |
let tmp = self.resolve_type(writer, reader_non_union, namespace).ok()?; | |
Some((0usize, Self::coercion_from(&tmp)))) | |
})); | |
let mut dt = self.parse_type(reader_non_union, namespace)?; | |
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion { | |
writer_to_reader: Arc::from(writer_to_reader.collect()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm getting type annotation errors from this:
type annotations needed [E0283]
cannot infer type of the type parameter `B` declared on the method `collect`
Have a slight variation that I'll push up in the PR. This is definitely cleaner though.
if how == Promotion::Direct { | ||
direct = Some((reader_index, how)); | ||
break; // first exact match wins | ||
} else if promo.is_none() { | ||
promo = Some((reader_index, how)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double checking intent -- Use the first-found promo, unless a direct match is found?
If so, I think we can use just the promo
option for both:
if how == Promotion::Direct {
promo = Some((reader_index, how));
break; // first exact match wins
}
if promo.is_none() {
// first promo wins, unless an exact match is found later
promo = Some((reader_index, how));
}
and then
let Some((reader_index, promotion) = promo else {
return ArrowError::SchemaError(...);
};
(again below)
Schema::Union(branches) | ||
} | ||
|
||
fn mk_record_named(name: &'static str) -> Schema<'static> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static lifetimes will pretty strongly constrain real-world usage... is there a reason it needs to be fixed? Why not just
fn mk_record_name<'a>(name: &'a str) -> Schema<'a>
match null_order { | ||
Nullability::NullFirst => { | ||
let mut out = Vec::with_capacity(union.len() + 1); | ||
out.push(null); | ||
out.extend(union); | ||
Value::Array(out) | ||
} | ||
Nullability::NullSecond => { | ||
union.push(null); | ||
Value::Array(union) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match null_order { | |
Nullability::NullFirst => { | |
let mut out = Vec::with_capacity(union.len() + 1); | |
out.push(null); | |
out.extend(union); | |
Value::Array(out) | |
} | |
Nullability::NullSecond => { | |
union.push(null); | |
Value::Array(union) | |
} | |
} | |
match null_order { | |
Nullability::NullFirst => union.insert(0, null), | |
Nullability::NullSecond => union.push(null), | |
} | |
Value::Array(union) |
(I guess it should really be called Nullability::NullLast
?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much cleaner.
(I guess it should really be called Nullability::NullLast?)
100% That change is coming. Just wanted to get a dedicated follow-up PR for it.
})?; | ||
match t { | ||
"record" | "enum" | "fixed" => { | ||
let name = map.get("name").and_then(|v| v.as_str()).unwrap_or_default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the default &str
, out of curiosity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know what, I should be throwing an error here. This is out of spec, since "record" | "enum" | "fixed"
are named types. Ty for pointing this out. I'll include it in the follow-up.
Sorry -- hopefully @jecsand838 can address any needed comments as a follow on PR |
I'll make another PR with these changes. |
# Which issue does this PR close? This work continues arrow-avro schema resolution support and aligns behavior with the Avro spec. - **Related to**: #4886 (“Add Avro Support”): ongoing work to round out the reader/decoder, including schema resolution and type promotion. - **Follow-ups/Context**: #8348 (Add arrow-avro Reader support for Dense Union and Union resolution (Part 1)) # Rationale for this change @scovich left a really solid [review](#8348 (review)) on #8348 that wasn't completed until after the PR was merged in. This PR is for addressing the suggestions and improving the code. # What changes are included in this PR? * Code quality improvements to `codec.rs` * Improvements to `schema.rs` including spec compliant named type errors. # Are these changes tested? 1. No functionality was added / modified in `codec.rs` and all existing tests are passing without changes. 2. Two new unit tests were added to `schema.rs` to cover the spec compliant named type changes. # Are there any user-facing changes? N/A --------- Co-authored-by: Ryan Johnson <[email protected]>
Which issue does this PR close?
This work continues arrow-avro schema resolution support and aligns behavior with the Avro spec.
Rationale for this change
arrow-avro
lacked end‑to‑end support for Avro unions and ArrowUnion
schemas. Many Avro datasets rely on unions (i.e.,["null","string"]
, tagged unions of different records), and without schema‐level resolution and JSON encoding the crate could not interoperate cleanly. This PR brings union schema resolution to parity with the Avro spec (duplicate-branch and nested‑union checks), adds Arrow to Avro union schema conversion (with mode/type‑id metadata), and lays groundwork for data decoding in a follow‑up.What changes are included in this PR?
Schema resolution & codecs
Codec::Union(Arc<[AvroDataType]>, UnionFields, UnionMode)
and map it to ArrowDataType::Union
.ResolvedUnion
and extendResolutionInfo
with aUnion(...)
variant to capture writer to reader branch mapping (prefers direct matches over promotions).null
defaults for unions whose first branch isnull
; reject empty unions for defaults.record
/enum
/fixed
).["null", "int"]
vs["int", "null"]
).int
,string
,map
, ...) and construct denseUnionFields
consistently.Arrow and Avro schema conversion
DataType::Union
to Avro union JSON:"arrowUnionMode"
:"dense"
or"sparse"
."arrowUnionTypeIds"
: ordered list of Arrow type IDs."arrowBinaryView"
forBinaryView
."arrowListView"
/"arrowLargeList"
for list view types.Reader/decoder stub
RecordDecoder
(schema support first; decoding to follow).Refactors & utilities
make_full_name
within the crate for union branch keying; deriveHash
forPrimitiveType
; add helpers for branch de‑duplication.Are these changes tested?
Yes. New unit tests cover:
Union
to Avro union JSON including mode/type‑id metadata and branch shapes.Are there any user-facing changes?
N/A