Generate Avro Schemas from a Python class
python 3.7+
pip install dataclasses-avroschemahttps://marcosschroh.github.io/dataclasses-avroschema/
from dataclasses import dataclass
import typing
from dataclasses_avroschema import AvroModel, types
@dataclass
class User(AvroModel):
"An User"
name: str
age: int
pets: typing.List[str]
accounts: typing.Dict[str, int]
favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"])
country: str = "Argentina"
address: str = None
class Meta:
namespace = "User.v1"
aliases = ["user-v1", "super user"]
User.avro_schema()
'{
"type": "record",
"name": "User",
"doc": "An User",
"namespace": "User.v1",
"aliases": ["user-v1", "super user"],
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "long"},
{"name": "pets", "type": "array", "items": "string"},
{"name": "accounts", "type": "map", "values": "long"},
{"name": "favorite_colors", "type": "enum", "symbols": ["BLUE", "YELLOW", "GREEN"]},
{"name": "country", "type": "string", "default": "Argentina"},
{"name": "address", "type": ["null", "string"], "default": null}
]
}'
User.avro_schema_to_python()
{
"type": "record",
"name": "User",
"doc": "An User",
"namespace": "User.v1",
"aliases": ["user-v1", "super user"],
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "long"},
{"name": "pets", "type": {"type": "array", "items": "string", "name": "pet"}},
{"name": "accounts", "type": {"type": "map", "values": "long", "name": "account"}},
{"name": "favorite_colors", "type": {"type": "enum", "name": "favorite_color", "symbols": ["BLUE", "YELLOW", "GREEN"]}},
{"name": "country", "type": "string", "default": "Argentina"},
{"name": "address", "type": ["null", "string"], "default": None}
],
}For serialization is neccesary to use python class/dataclasses instance
from dataclasses import dataclass
import typing
from dataclasses_avroschema import AvroModel
@dataclass
class Address(AvroModel):
"An Address"
street: str
street_number: int
@dataclass
class User(AvroModel):
"User with multiple Address"
name: str
age: int
addresses: typing.List[Address]
address_data = {
"street": "test",
"street_number": 10,
}
# create an Address instance
address = Address(**address_data)
data_user = {
"name": "john",
"age": 20,
"addresses": [address],
}
# create an User instance
user = User(**data_user)
user.serialize()
# >>> b"\x08john(\x02\x08test\x14\x00"
user.serialize(serialization_type="avro-json")
# >>> b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'
# Get the json from the instance
user.to_json()
# python dict >>> {'name': 'john', 'age': 20, 'addresses': [{'street': 'test', 'street_number': 10}]}Deserialization could take place with an instance dataclass or the dataclass itself. Can return the dict representation or a new class instance
import typing
from dataclasses_avroschema import AvroModel
class Address(AvroModel):
"An Address"
street: str
street_number: int
class User(AvroModel):
"User with multiple Address"
name: str
age: int
addresses: typing.List[Address]
avro_binary = b"\x08john(\x02\x08test\x14\x00"
avro_json_binary = b'{"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}'
# return a new class instance!!
User.deserialize(avro_binary)
# >>>> User(name='john', age=20, addresses=[Address(street='test', street_number=10)])
# return a python dict
User.deserialize(avro_binary, create_instance=False)
# >>> {"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}
# return a new class instance!!
User.deserialize(avro_json_binary, serialization_type="avro-json")
# >>>> User(name='john', age=20, addresses=[Address(street='test', street_number=10)])
# return a python dict
User.deserialize(avro_json_binary, serialization_type="avro-json", create_instance=False)
# >>> {"name": "john", "age": 20, "addresses": [{"street": "test", "street_number": 10}]}Under examples folder you can find 3 differents kafka examples, one with aiokafka (async) showing the simplest use case when a AvroModel instance is serialized and sent it thorught kafka, and the event is consumed.
The other two examples are sync using the kafka-python driver, where the avro-json serialization and schema evolution (FULL compatibility) is shown.
Also, there are two redis examples using redis streams with walrus and redisgears-py
Dataclasses Avro Schema also includes a factory feature, so you can generate fast python instances and use them, for example, to test your data streaming pipelines. Instances can be genrated using the fake method.
import typing
from dataclasses_avroschema import AvroModel
class Address(AvroModel):
"An Address"
street: str
street_number: int
class User(AvroModel):
"User with multiple Address"
name: str
age: int
addresses: typing.List[Address]
Address.fake()
# >>>> Address(street='PxZJILDRgbXyhWrrPWxQ', street_number=2067)
User.fake()
# >>>> User(name='VGSBbOGfSGjkMDnefHIZ', age=8974, addresses=[Address(street='vNpPYgesiHUwwzGcmMiS', street_number=4790)])- Primitive types: int, long, float, boolean, string and null support
- Complex types: enum, array, map, fixed, unions and records support
- Logical Types: date, time, datetime, uuid support
- Schema relations (oneToOne, oneToMany)
- Recursive Schemas
- Generate Avro Schemas from
faust.Record - Instance serialization correspondent to
avro schemagenerated - Data deserialization. Return python dict or class instance
- Generate json from python class instance
- Examples of integration with
kafkadrivers: aiokafka, kafka-python - Example of integration with
redisdrivers: walrus and redisgears-py - Factory instances
- Create a
virtualenv:python3.7 -m venv venv && source venv/bin/activate - Install requirements:
pip install -r requirements.txt - Code linting:
./scripts/lint - Run tests:
./scripts/test