Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a feature for Avro Serializaer/Deserializer #1751

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from . import (_MAGIC_BYTE,
Schema,
RegisteredSchema,
topic_subject_name_strategy)
from confluent_kafka.serialization import (Deserializer,
SerializationError,
Expand Down Expand Up @@ -166,9 +167,9 @@ class AvroSerializer(Serializer):

schema_str (str or Schema):
Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_
Accepts either a string or a :py:class:`Schema` instance. Note that string
Accepts either a string or a :py:class:`Schema` instance or a :py.class:`RegisteredSchema`. Note that string
definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
use a :py:class:`Schema` or :py.class:`RegisteredSchema` instance.

to_dict (callable, optional): Callable(object, SerializationContext) -> dict. Converts object to a dict.

Expand All @@ -185,16 +186,20 @@ class AvroSerializer(Serializer):
'subject.name.strategy': topic_subject_name_strategy}

def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None):
self._registry = schema_registry_client
self._schema_id = None
self._known_subjects = set()

if isinstance(schema_str, str):
schema = _schema_loads(schema_str)
elif isinstance(schema_str, Schema):
schema = schema_str
elif isinstance(schema_str, RegisteredSchema):
schema = schema_str.schema
self._schema_id = schema_str.schema_id
self._known_subjects.add(schema_str.subject)
else:
raise TypeError('You must pass either schema string or schema object')

self._registry = schema_registry_client
self._schema_id = None
self._known_subjects = set()
raise TypeError('You must pass either schema string or schema object or RegisteredSchema object')

if to_dict is not None and not callable(to_dict):
raise ValueError("to_dict must be callable with the signature "
Expand Down Expand Up @@ -327,11 +332,11 @@ class AvroDeserializer(Deserializer):
schema_registry_client (SchemaRegistryClient): Confluent Schema Registry
client instance.

schema_str (str, Schema, optional): Avro reader schema declaration Accepts
either a string or a :py:class:`Schema` instance. If not provided, the
schema_str (str, Schema, RegisteredSchema, optional): Avro reader schema declaration Accepts
either a string or a :py:class:`Schema` or :py.class:`RegisteredSchema` instance. If not provided, the
writer schema will be used as the reader schema. Note that string
definitions cannot reference other schemas. For referencing other schemas,
use a :py:class:`Schema` instance.
use a :py:class:`Schema` or :py.class:`RegisteredSchema` instance.

from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
Converts a dict to an instance of some object.
Expand All @@ -351,13 +356,17 @@ class AvroDeserializer(Deserializer):

def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False):
schema = None
schema_id = None
if schema_str is not None:
if isinstance(schema_str, str):
schema = _schema_loads(schema_str)
elif isinstance(schema_str, Schema):
schema = schema_str
elif isinstance(schema_str, RegisteredSchema):
schema = schema_str.schema
schema_id = schema_str.schema_id
else:
raise TypeError('You must pass either schema string or schema object')
raise TypeError('You must pass either schema string or schema object or RegisteredSchema object')

self._schema = schema
self._registry = schema_registry_client
Expand All @@ -366,8 +375,13 @@ def __init__(self, schema_registry_client, schema_str=None, from_dict=None, retu
if schema:
schema_dict = loads(self._schema.schema_str)
self._named_schemas = _resolve_named_schema(self._schema, schema_registry_client)
self._reader_schema = parse_schema(schema_dict,
named_schemas=self._named_schemas)
parsed_schema = parse_schema(schema_dict, named_schemas=self._named_schemas)

if schema_id is None:
self._reader_schema = parsed_schema
else:
self._writer_schemas[schema_id] = parsed_schema
self._reader_schema = None
else:
self._named_schemas = None
self._reader_schema = None
Expand Down