diff --git a/src/confluent_kafka/schema_registry/avro.py b/src/confluent_kafka/schema_registry/avro.py index 9b5209909..a8f321a3b 100644 --- a/src/confluent_kafka/schema_registry/avro.py +++ b/src/confluent_kafka/schema_registry/avro.py @@ -25,6 +25,7 @@ from . import (_MAGIC_BYTE, Schema, + RegisteredSchema, topic_subject_name_strategy) from confluent_kafka.serialization import (Deserializer, SerializationError, @@ -166,9 +167,9 @@ class AvroSerializer(Serializer): schema_str (str or Schema): Avro `Schema Declaration. `_ - Accepts either a string or a :py:class:`Schema` instance. Note that string + Accepts either a string or a :py:class:`Schema` instance or a :py.class:`RegisteredSchema`. Note that string definitions cannot reference other schemas. For referencing other schemas, - use a :py:class:`Schema` instance. + use a :py:class:`Schema` or :py.class:`RegisteredSchema` instance. to_dict (callable, optional): Callable(object, SerializationContext) -> dict. Converts object to a dict. @@ -185,16 +186,20 @@ class AvroSerializer(Serializer): 'subject.name.strategy': topic_subject_name_strategy} def __init__(self, schema_registry_client, schema_str, to_dict=None, conf=None): + self._registry = schema_registry_client + self._schema_id = None + self._known_subjects = set() + if isinstance(schema_str, str): schema = _schema_loads(schema_str) elif isinstance(schema_str, Schema): schema = schema_str + elif isinstance(schema_str, RegisteredSchema): + schema = schema_str.schema + self._schema_id = schema_str.schema_id + self._known_subjects.add(schema_str.subject) else: - raise TypeError('You must pass either schema string or schema object') - - self._registry = schema_registry_client - self._schema_id = None - self._known_subjects = set() + raise TypeError('You must pass either schema string or schema object or RegisteredSchema object') if to_dict is not None and not callable(to_dict): raise ValueError("to_dict must be callable with the signature " @@ -327,11 +332,11 @@ class AvroDeserializer(Deserializer): schema_registry_client (SchemaRegistryClient): Confluent Schema Registry client instance. - schema_str (str, Schema, optional): Avro reader schema declaration Accepts - either a string or a :py:class:`Schema` instance. If not provided, the + schema_str (str, Schema, RegisteredSchema, optional): Avro reader schema declaration Accepts + either a string or a :py:class:`Schema` or :py.class:`RegisteredSchema` instance. If not provided, the writer schema will be used as the reader schema. Note that string definitions cannot reference other schemas. For referencing other schemas, - use a :py:class:`Schema` instance. + use a :py:class:`Schema` or :py.class:`RegisteredSchema` instance. from_dict (callable, optional): Callable(dict, SerializationContext) -> object. Converts a dict to an instance of some object. @@ -351,13 +356,17 @@ class AvroDeserializer(Deserializer): def __init__(self, schema_registry_client, schema_str=None, from_dict=None, return_record_name=False): schema = None + schema_id = None if schema_str is not None: if isinstance(schema_str, str): schema = _schema_loads(schema_str) elif isinstance(schema_str, Schema): schema = schema_str + elif isinstance(schema_str, RegisteredSchema): + schema = schema_str.schema + schema_id = schema_str.schema_id else: - raise TypeError('You must pass either schema string or schema object') + raise TypeError('You must pass either schema string or schema object or RegisteredSchema object') self._schema = schema self._registry = schema_registry_client @@ -366,8 +375,13 @@ def __init__(self, schema_registry_client, schema_str=None, from_dict=None, retu if schema: schema_dict = loads(self._schema.schema_str) self._named_schemas = _resolve_named_schema(self._schema, schema_registry_client) - self._reader_schema = parse_schema(schema_dict, - named_schemas=self._named_schemas) + parsed_schema = parse_schema(schema_dict, named_schemas=self._named_schemas) + + if schema_id is None: + self._reader_schema = parsed_schema + else: + self._writer_schemas[schema_id] = parsed_schema + self._reader_schema = None else: self._named_schemas = None self._reader_schema = None