Skip to content

Commit 674dc17

Browse files
Add Fixed, Uuid support to arrow-avro (#7557)
# Which issue does this PR close? Part of [4886](#4886) Related to [6965](#6965) # Rationale for this change This change expands upon the Avro reader logic by adding full support for the Fixed and Uuid types (Uuid relies on Fixed). It builds out the `Fixed` path currently stubbed out. # What changes are included in this PR? Adds `Fixed` and `Uuid` support to the arrow-avro crate with changes to the following: 1. arrow-avro/src/codec.rs - Adds support for `Uuid` type - Adds test 2. arrow-avro/src/reader/cursor.rs: - Adds a `get_fixed` helper method to read the specified bytes into a buffer. 3. arrow-avro/src/reader/record.rs: - Introduces the Fixed decoding path, building out the `nyi` `Codec::Fixed` in the `Decoder`. - Introduces the Uuid decoding path, building off of Fixed - Adds tests. # Are there any user-facing changes? n/a --------- Co-authored-by: Connor Sanders <[email protected]>
1 parent 3183e03 commit 674dc17

File tree

3 files changed

+126
-1
lines changed

3 files changed

+126
-1
lines changed

arrow-avro/src/codec.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ pub enum Codec {
192192
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
193193
/// The i32 parameter indicates the fixed binary size
194194
Fixed(i32),
195+
/// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
196+
Uuid,
195197
/// Represents Avro array type, maps to Arrow's List data type
196198
List(Arc<AvroDataType>),
197199
/// Represents Avro record type, maps to Arrow's Struct data type
@@ -225,6 +227,7 @@ impl Codec {
225227
}
226228
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
227229
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
230+
Self::Uuid => DataType::FixedSizeBinary(16),
228231
Self::List(f) => {
229232
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
230233
}
@@ -457,6 +460,7 @@ fn make_data_type<'a>(
457460
*c = Codec::TimestampMicros(false)
458461
}
459462
(Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval,
463+
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
460464
(Some(logical), _) => {
461465
// Insert unrecognized logical type into metadata map
462466
field.metadata.insert("logicalType".into(), logical.into());
@@ -583,6 +587,17 @@ mod tests {
583587
assert!(matches!(result.codec, Codec::TimestampMicros(false)));
584588
}
585589

590+
#[test]
591+
fn test_uuid_type() {
592+
let mut codec = Codec::Fixed(16);
593+
594+
if let c @ Codec::Fixed(16) = &mut codec {
595+
*c = Codec::Uuid;
596+
}
597+
598+
assert!(matches!(codec, Codec::Uuid));
599+
}
600+
586601
#[test]
587602
fn test_duration_logical_type() {
588603
let mut codec = Codec::Fixed(12);

arrow-avro/src/reader/cursor.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,16 @@ impl<'a> AvroCursor<'a> {
118118
self.buf = &self.buf[8..];
119119
Ok(ret)
120120
}
121+
122+
/// Read exactly `n` bytes from the buffer (e.g. for Avro `fixed`).
123+
pub(crate) fn get_fixed(&mut self, n: usize) -> Result<&'a [u8], ArrowError> {
124+
if self.buf.len() < n {
125+
return Err(ArrowError::ParseError(
126+
"Unexpected EOF reading fixed".to_string(),
127+
));
128+
}
129+
let ret = &self.buf[..n];
130+
self.buf = &self.buf[n..];
131+
Ok(ret)
132+
}
121133
}

arrow-avro/src/reader/record.rs

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ enum Decoder {
122122
Vec<u8>,
123123
Box<Decoder>,
124124
),
125+
Fixed(i32, Vec<u8>),
125126
Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
126127
}
127128

@@ -157,7 +158,7 @@ impl Decoder {
157158
Codec::TimestampMicros(is_utc) => {
158159
Self::TimestampMicros(*is_utc, Vec::with_capacity(DEFAULT_CAPACITY))
159160
}
160-
Codec::Fixed(_) => return nyi("decoding fixed"),
161+
Codec::Fixed(sz) => Self::Fixed(*sz, Vec::with_capacity(DEFAULT_CAPACITY)),
161162
Codec::Interval => return nyi("decoding interval"),
162163
Codec::List(item) => {
163164
let decoder = Self::try_new(item)?;
@@ -196,6 +197,7 @@ impl Decoder {
196197
Box::new(val_dec),
197198
)
198199
}
200+
Codec::Uuid => Self::Fixed(16, Vec::with_capacity(DEFAULT_CAPACITY)),
199201
};
200202

201203
Ok(match data_type.nullability() {
@@ -232,6 +234,9 @@ impl Decoder {
232234
moff.push_length(0);
233235
}
234236
Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
237+
Self::Fixed(sz, accum) => {
238+
accum.extend(std::iter::repeat(0u8).take(*sz as usize));
239+
}
235240
}
236241
}
237242

@@ -282,6 +287,10 @@ impl Decoder {
282287
false => e.append_null(),
283288
}
284289
}
290+
Self::Fixed(sz, accum) => {
291+
let fx = buf.get_fixed(*sz as usize)?;
292+
accum.extend_from_slice(fx);
293+
}
285294
}
286295
Ok(())
287296
}
@@ -383,6 +392,12 @@ impl Decoder {
383392
let map_arr = MapArray::new(map_field.clone(), moff, entries_struct, nulls, false);
384393
Arc::new(map_arr)
385394
}
395+
Self::Fixed(sz, accum) => {
396+
let b: Buffer = flush_values(accum).into();
397+
let arr = FixedSizeBinaryArray::try_new(*sz, b, nulls)
398+
.map_err(|e| ArrowError::ParseError(e.to_string()))?;
399+
Arc::new(arr)
400+
}
386401
})
387402
}
388403
}
@@ -542,6 +557,89 @@ mod tests {
542557
assert_eq!(map_arr.value_length(0), 0);
543558
}
544559

560+
#[test]
561+
fn test_fixed_decoding() {
562+
let avro_type = avro_from_codec(Codec::Fixed(3));
563+
let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
564+
565+
let data1 = [1u8, 2, 3];
566+
let mut cursor1 = AvroCursor::new(&data1);
567+
decoder
568+
.decode(&mut cursor1)
569+
.expect("Failed to decode data1");
570+
assert_eq!(cursor1.position(), 3, "Cursor should advance by fixed size");
571+
572+
let data2 = [4u8, 5, 6];
573+
let mut cursor2 = AvroCursor::new(&data2);
574+
decoder
575+
.decode(&mut cursor2)
576+
.expect("Failed to decode data2");
577+
assert_eq!(cursor2.position(), 3, "Cursor should advance by fixed size");
578+
579+
let array = decoder.flush(None).expect("Failed to flush decoder");
580+
581+
assert_eq!(array.len(), 2, "Array should contain two items");
582+
let fixed_size_binary_array = array
583+
.as_any()
584+
.downcast_ref::<FixedSizeBinaryArray>()
585+
.expect("Failed to downcast to FixedSizeBinaryArray");
586+
587+
assert_eq!(
588+
fixed_size_binary_array.value_length(),
589+
3,
590+
"Fixed size of binary values should be 3"
591+
);
592+
assert_eq!(
593+
fixed_size_binary_array.value(0),
594+
&[1, 2, 3],
595+
"First item mismatch"
596+
);
597+
assert_eq!(
598+
fixed_size_binary_array.value(1),
599+
&[4, 5, 6],
600+
"Second item mismatch"
601+
);
602+
}
603+
604+
#[test]
605+
fn test_fixed_decoding_empty() {
606+
let avro_type = avro_from_codec(Codec::Fixed(5));
607+
let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
608+
609+
let array = decoder
610+
.flush(None)
611+
.expect("Failed to flush decoder for empty input");
612+
613+
assert_eq!(array.len(), 0, "Array should be empty");
614+
let fixed_size_binary_array = array
615+
.as_any()
616+
.downcast_ref::<FixedSizeBinaryArray>()
617+
.expect("Failed to downcast to FixedSizeBinaryArray for empty array");
618+
619+
assert_eq!(
620+
fixed_size_binary_array.value_length(),
621+
5,
622+
"Fixed size of binary values should be 5 as per type"
623+
);
624+
}
625+
626+
#[test]
627+
fn test_uuid_decoding() {
628+
let avro_type = avro_from_codec(Codec::Uuid);
629+
let mut decoder = Decoder::try_new(&avro_type).expect("Failed to create decoder");
630+
631+
let data1 = [1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
632+
let mut cursor1 = AvroCursor::new(&data1);
633+
decoder
634+
.decode(&mut cursor1)
635+
.expect("Failed to decode data1");
636+
assert_eq!(
637+
cursor1.position(),
638+
16,
639+
"Cursor should advance by fixed size"
640+
);
641+
}
642+
545643
#[test]
546644
fn test_array_decoding() {
547645
let item_dt = avro_from_codec(Codec::Int32);

0 commit comments

Comments
 (0)