Skip to content

Commit 4a21443

Browse files
jecsand838scovich
andauthored
Implement arrow-avro SchemaStore and Fingerprinting To Enable Schema Resolution (#8006)
# Which issue does this PR close? - Part of #4886 - Follow up to #7834 # Rationale for this change Apache Avro’s [single object encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding) prefixes every record with the marker `0xC3 0x01` followed by a `Rabin` [schema fingerprint ](https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints) so that readers can identify the correct writer schema without carrying the full definition in each message. While the current `arrow‑avro` implementation can read container files, it cannot ingest these framed messages or handle streams where the writer schema changes over time. The Avro specification recommends computing a 64‑bit CRC‑64‑AVRO (Rabin) hashed fingerprint of the [parsed canonical form of a schema](https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas) to look up the `Schema` from a local schema store or registry. This PR introduces **`SchemaStore`** and **fingerprinting** to enable: * **Zero‑copy schema identification** for decoding streaming Avro messages published in single‑object format (i.e. Kafka, Pulsar, etc) into Arrow. * **Dynamic schema evolution** by laying the foundation to resolve writer reader schema differences on the fly. **NOTE:** Schema Resolution support in `Codec` and `RecordDecoder` coming the next PR. # What changes are included in this PR? | Area | Highlights | | ------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | **`reader/mod.rs`** | Decoder now detects the `C3 01` prefix, extracts the fingerprint, looks up the writer schema in a `SchemaStore`, and switches to an LRU cached `RecordDecoder` without interrupting streaming; supports `static_store_mode` to skip the 2 byte peek for high‑throughput fixed‑schema pipelines. | | **`ReaderBuilder`** | New builder configuration methods: `.with_writer_schema_store`, `.with_active_fingerprint`, `.with_static_store_mode`, `.with_reader_schema`, `.with_max_decoder_cache_size`, with rigorous validation to prevent misconfiguration. | | **Unit tests** | New tests covering fingerprint generation, store registration/lookup, schema switching, unknown‑fingerprint errors, and interaction with UTF8‑view decoding. | | **Docs & Examples** | Extensive inline docs with examples on all new public methods / structs. | --- # Are these changes tested? Yes. New tests cover: 1. **Fingerprinting** against the canonical examples from the Avro spec 2. **`SchemaStore` behavior** deduplication, duplicate registration, and lookup. 3. **Decoder fast‑path** with `static_store_mode=true`, ensuring the prefix is treated as payload, the 2 byte peek is skipped, and no schema switch is attempted. # Are there any user-facing changes? N/A # Follow-Up PRs 1. Implement Schema Resolution Functionality in Codec and RecordDecoder 2. Add ID `Fingerprint` variant on `SchemaStore` for Confluent Schema Registry compatibility 3. Improve arrow-avro errors + add more benchmarks & examples to prepare for public release --------- Co-authored-by: Ryan Johnson <[email protected]>
1 parent c561acb commit 4a21443

File tree

5 files changed

+589
-166
lines changed

5 files changed

+589
-166
lines changed

arrow-avro/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ zstd = { version = "0.13", default-features = false, optional = true }
5555
bzip2 = { version = "0.6.0", optional = true }
5656
xz = { version = "0.1", default-features = false, optional = true }
5757
crc = { version = "3.0", optional = true }
58-
uuid = "1.17"
5958
strum_macros = "0.27"
59+
uuid = "1.17"
60+
indexmap = "2.10"
61+
6062

6163
[dev-dependencies]
6264
arrow-data = { workspace = true }

arrow-avro/benches/decoder.rs

Lines changed: 101 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -27,68 +27,89 @@ extern crate uuid;
2727

2828
use apache_avro::types::Value;
2929
use apache_avro::{to_avro_datum, Decimal, Schema as ApacheSchema};
30-
use arrow_avro::{reader::ReaderBuilder, schema::Schema as AvroSchema};
30+
use arrow_avro::schema::{Fingerprint, SINGLE_OBJECT_MAGIC};
31+
use arrow_avro::{reader::ReaderBuilder, schema::AvroSchema};
3132
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
3233
use once_cell::sync::Lazy;
33-
use std::{hint::black_box, io, time::Duration};
34+
use std::{hint::black_box, time::Duration};
3435
use uuid::Uuid;
3536

36-
fn encode_records(schema: &ApacheSchema, rows: impl Iterator<Item = Value>) -> Vec<u8> {
37+
fn make_prefix(fp: Fingerprint) -> [u8; 10] {
38+
let Fingerprint::Rabin(val) = fp;
39+
let mut buf = [0u8; 10];
40+
buf[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
41+
buf[2..].copy_from_slice(&val.to_le_bytes()); // little‑endian 64‑bit
42+
buf
43+
}
44+
45+
fn encode_records_with_prefix(
46+
schema: &ApacheSchema,
47+
prefix: &[u8],
48+
rows: impl Iterator<Item = Value>,
49+
) -> Vec<u8> {
3750
let mut out = Vec::new();
3851
for v in rows {
52+
out.extend_from_slice(prefix);
3953
out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum failed"));
4054
}
4155
out
4256
}
4357

44-
fn gen_int(sc: &ApacheSchema, n: usize) -> Vec<u8> {
45-
encode_records(
58+
fn gen_int(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
59+
encode_records_with_prefix(
4660
sc,
61+
prefix,
4762
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as i32))])),
4863
)
4964
}
5065

51-
fn gen_long(sc: &ApacheSchema, n: usize) -> Vec<u8> {
52-
encode_records(
66+
fn gen_long(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
67+
encode_records_with_prefix(
5368
sc,
69+
prefix,
5470
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long(i as i64))])),
5571
)
5672
}
5773

58-
fn gen_float(sc: &ApacheSchema, n: usize) -> Vec<u8> {
59-
encode_records(
74+
fn gen_float(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
75+
encode_records_with_prefix(
6076
sc,
77+
prefix,
6178
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Float(i as f32 + 0.5678))])),
6279
)
6380
}
6481

65-
fn gen_bool(sc: &ApacheSchema, n: usize) -> Vec<u8> {
66-
encode_records(
82+
fn gen_bool(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
83+
encode_records_with_prefix(
6784
sc,
85+
prefix,
6886
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Boolean(i % 2 == 0))])),
6987
)
7088
}
7189

72-
fn gen_double(sc: &ApacheSchema, n: usize) -> Vec<u8> {
73-
encode_records(
90+
fn gen_double(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
91+
encode_records_with_prefix(
7492
sc,
93+
prefix,
7594
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Double(i as f64 + 0.1234))])),
7695
)
7796
}
7897

79-
fn gen_bytes(sc: &ApacheSchema, n: usize) -> Vec<u8> {
80-
encode_records(
98+
fn gen_bytes(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
99+
encode_records_with_prefix(
81100
sc,
101+
prefix,
82102
(0..n).map(|i| {
83103
let payload = vec![(i & 0xFF) as u8; 16];
84104
Value::Record(vec![("field1".into(), Value::Bytes(payload))])
85105
}),
86106
)
87107
}
88108

89-
fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
90-
encode_records(
109+
fn gen_string(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
110+
encode_records_with_prefix(
91111
sc,
112+
prefix,
92113
(0..n).map(|i| {
93114
let s = if i % 3 == 0 {
94115
format!("value-{i}")
@@ -100,30 +121,34 @@ fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
100121
)
101122
}
102123

103-
fn gen_date(sc: &ApacheSchema, n: usize) -> Vec<u8> {
104-
encode_records(
124+
fn gen_date(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
125+
encode_records_with_prefix(
105126
sc,
127+
prefix,
106128
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as i32))])),
107129
)
108130
}
109131

110-
fn gen_timemillis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
111-
encode_records(
132+
fn gen_timemillis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
133+
encode_records_with_prefix(
112134
sc,
135+
prefix,
113136
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int((i * 37) as i32))])),
114137
)
115138
}
116139

117-
fn gen_timemicros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
118-
encode_records(
140+
fn gen_timemicros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
141+
encode_records_with_prefix(
119142
sc,
143+
prefix,
120144
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long((i * 1_001) as i64))])),
121145
)
122146
}
123147

124-
fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
125-
encode_records(
148+
fn gen_ts_millis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
149+
encode_records_with_prefix(
126150
sc,
151+
prefix,
127152
(0..n).map(|i| {
128153
Value::Record(vec![(
129154
"field1".into(),
@@ -133,9 +158,10 @@ fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
133158
)
134159
}
135160

136-
fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
137-
encode_records(
161+
fn gen_ts_micros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
162+
encode_records_with_prefix(
138163
sc,
164+
prefix,
139165
(0..n).map(|i| {
140166
Value::Record(vec![(
141167
"field1".into(),
@@ -145,10 +171,11 @@ fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
145171
)
146172
}
147173

148-
fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
174+
fn gen_map(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
149175
use std::collections::HashMap;
150-
encode_records(
176+
encode_records_with_prefix(
151177
sc,
178+
prefix,
152179
(0..n).map(|i| {
153180
let mut m = HashMap::new();
154181
let int_val = |v: i32| Value::Union(0, Box::new(Value::Int(v)));
@@ -165,9 +192,10 @@ fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
165192
)
166193
}
167194

168-
fn gen_array(sc: &ApacheSchema, n: usize) -> Vec<u8> {
169-
encode_records(
195+
fn gen_array(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
196+
encode_records_with_prefix(
170197
sc,
198+
prefix,
171199
(0..n).map(|i| {
172200
let items = (0..5).map(|j| Value::Int(i as i32 + j)).collect();
173201
Value::Record(vec![("field1".into(), Value::Array(items))])
@@ -189,9 +217,10 @@ fn trim_i128_be(v: i128) -> Vec<u8> {
189217
full[first..].to_vec()
190218
}
191219

192-
fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
193-
encode_records(
220+
fn gen_decimal(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
221+
encode_records_with_prefix(
194222
sc,
223+
prefix,
195224
(0..n).map(|i| {
196225
let unscaled = if i % 2 == 0 { i as i128 } else { -(i as i128) };
197226
Value::Record(vec![(
@@ -202,9 +231,10 @@ fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
202231
)
203232
}
204233

205-
fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
206-
encode_records(
234+
fn gen_uuid(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
235+
encode_records_with_prefix(
207236
sc,
237+
prefix,
208238
(0..n).map(|i| {
209239
let mut raw = (i as u128).to_be_bytes();
210240
raw[6] = (raw[6] & 0x0F) | 0x40;
@@ -214,9 +244,10 @@ fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
214244
)
215245
}
216246

217-
fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
218-
encode_records(
247+
fn gen_fixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
248+
encode_records_with_prefix(
219249
sc,
250+
prefix,
220251
(0..n).map(|i| {
221252
let mut buf = vec![0u8; 16];
222253
buf[..8].copy_from_slice(&(i as u64).to_be_bytes());
@@ -225,9 +256,10 @@ fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
225256
)
226257
}
227258

228-
fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
229-
encode_records(
259+
fn gen_interval(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
260+
encode_records_with_prefix(
230261
sc,
262+
prefix,
231263
(0..n).map(|i| {
232264
let months = (i % 24) as u32;
233265
let days = (i % 32) as u32;
@@ -241,10 +273,11 @@ fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
241273
)
242274
}
243275

244-
fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
276+
fn gen_enum(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
245277
const SYMBOLS: [&str; 3] = ["A", "B", "C"];
246-
encode_records(
278+
encode_records_with_prefix(
247279
sc,
280+
prefix,
248281
(0..n).map(|i| {
249282
let idx = i % 3;
250283
Value::Record(vec![(
@@ -255,9 +288,10 @@ fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
255288
)
256289
}
257290

258-
fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
259-
encode_records(
291+
fn gen_mixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
292+
encode_records_with_prefix(
260293
sc,
294+
prefix,
261295
(0..n).map(|i| {
262296
Value::Record(vec![
263297
("f1".into(), Value::Int(i as i32)),
@@ -269,9 +303,10 @@ fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
269303
)
270304
}
271305

272-
fn gen_nested(sc: &ApacheSchema, n: usize) -> Vec<u8> {
273-
encode_records(
306+
fn gen_nested(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
307+
encode_records_with_prefix(
274308
sc,
309+
prefix,
275310
(0..n).map(|i| {
276311
let sub = Value::Record(vec![
277312
("x".into(), Value::Int(i as i32)),
@@ -290,12 +325,14 @@ fn new_decoder(
290325
batch_size: usize,
291326
utf8view: bool,
292327
) -> arrow_avro::reader::Decoder {
293-
let schema: AvroSchema<'static> = serde_json::from_str(schema_json).unwrap();
328+
let schema = AvroSchema::new(schema_json.parse().unwrap());
329+
let mut store = arrow_avro::schema::SchemaStore::new();
330+
store.register(schema.clone()).unwrap();
294331
ReaderBuilder::new()
295-
.with_schema(schema)
332+
.with_writer_schema_store(store)
296333
.with_batch_size(batch_size)
297334
.with_utf8_view(utf8view)
298-
.build_decoder(io::empty())
335+
.build_decoder()
299336
.expect("failed to build decoder")
300337
}
301338

@@ -325,8 +362,8 @@ const ARRAY_SCHEMA: &str = r#"{"type":"record","name":"ArrRec","fields":[{"name"
325362
const DECIMAL_SCHEMA: &str = r#"{"type":"record","name":"DecRec","fields":[{"name":"field1","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":3}}]}"#;
326363
const UUID_SCHEMA: &str = r#"{"type":"record","name":"UuidRec","fields":[{"name":"field1","type":{"type":"string","logicalType":"uuid"}}]}"#;
327364
const FIXED_SCHEMA: &str = r#"{"type":"record","name":"FixRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Fixed16","size":16}}]}"#;
328-
const INTERVAL_SCHEMA_ENCODE: &str = r#"{"type":"record","name":"DurRecEnc","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#;
329365
const INTERVAL_SCHEMA: &str = r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12,"logicalType":"duration"}}]}"#;
366+
const INTERVAL_SCHEMA_ENCODE: &str = r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#;
330367
const ENUM_SCHEMA: &str = r#"{"type":"record","name":"EnumRec","fields":[{"name":"field1","type":{"type":"enum","name":"MyEnum","symbols":["A","B","C"]}}]}"#;
331368
const MIX_SCHEMA: &str = r#"{"type":"record","name":"MixRec","fields":[{"name":"f1","type":"int"},{"name":"f2","type":"long"},{"name":"f3","type":"string"},{"name":"f4","type":"double"}]}"#;
332369
const NEST_SCHEMA: &str = r#"{"type":"record","name":"NestRec","fields":[{"name":"sub","type":{"type":"record","name":"Sub","fields":[{"name":"x","type":"int"},{"name":"y","type":"string"}]}}]}"#;
@@ -336,7 +373,13 @@ macro_rules! dataset {
336373
static $name: Lazy<Vec<Vec<u8>>> = Lazy::new(|| {
337374
let schema =
338375
ApacheSchema::parse_str($schema_json).expect("invalid schema for generator");
339-
SIZES.iter().map(|&n| $gen_fn(&schema, n)).collect()
376+
let arrow_schema = AvroSchema::new($schema_json.to_string());
377+
let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed");
378+
let prefix = make_prefix(fingerprint);
379+
SIZES
380+
.iter()
381+
.map(|&n| $gen_fn(&schema, n, &prefix))
382+
.collect()
340383
});
341384
};
342385
}
@@ -406,6 +449,14 @@ fn bench_scenario(
406449

407450
fn criterion_benches(c: &mut Criterion) {
408451
for &batch_size in &[SMALL_BATCH, LARGE_BATCH] {
452+
bench_scenario(
453+
c,
454+
"Interval",
455+
INTERVAL_SCHEMA,
456+
&INTERVAL_DATA,
457+
false,
458+
batch_size,
459+
);
409460
bench_scenario(c, "Int32", INT_SCHEMA, &INT_DATA, false, batch_size);
410461
bench_scenario(c, "Int64", LONG_SCHEMA, &LONG_DATA, false, batch_size);
411462
bench_scenario(c, "Float32", FLOAT_SCHEMA, &FLOAT_DATA, false, batch_size);
@@ -480,14 +531,6 @@ fn criterion_benches(c: &mut Criterion) {
480531
false,
481532
batch_size,
482533
);
483-
bench_scenario(
484-
c,
485-
"Interval",
486-
INTERVAL_SCHEMA,
487-
&INTERVAL_DATA,
488-
false,
489-
batch_size,
490-
);
491534
bench_scenario(
492535
c,
493536
"Enum(Dictionary)",

0 commit comments

Comments
 (0)