Skip to content

Commit 1d1cad9

Browse files
authored
Retrieve schema from schema registry client in JSONDeserialier if schema_str not provided (#1798)
* deserialize JSON using wire header schema ID and registry client * update docs
1 parent c32d69d commit 1d1cad9

File tree

1 file changed

+19
-5
lines changed

1 file changed

+19
-5
lines changed

src/confluent_kafka/schema_registry/json_schema.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -296,16 +296,18 @@ class JSONDeserializer(Deserializer):
296296
framing.
297297
298298
Args:
299-
schema_str (str, Schema):
299+
schema_str (str, Schema, optional):
300300
`JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
301301
Accepts schema as either a string or a :py:class:`Schema` instance.
302302
Note that string definitions cannot reference other schemas. For referencing other schemas,
303-
use a :py:class:`Schema` instance.
303+
use a :py:class:`Schema` instance. If not provided, schemas will be
304+
retrieved from schema_registry_client based on the schema ID in the
305+
wire header of each message.
304306
305307
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
306308
Converts a dict to a Python object instance.
307309
308-
schema_registry_client (SchemaRegistryClient, optional): Schema Registry client instance. Needed if ``schema_str`` is a schema referencing other schemas.
310+
schema_registry_client (SchemaRegistryClient, optional): Schema Registry client instance. Needed if ``schema_str`` is a schema referencing other schemas or is not provided.
309311
""" # noqa: E501
310312

311313
__slots__ = ['_parsed_schema', '_from_dict', '_registry', '_are_references_provided', '_schema']
@@ -320,10 +322,16 @@ def __init__(self, schema_str, from_dict=None, schema_registry_client=None):
320322
if self._are_references_provided and schema_registry_client is None:
321323
raise ValueError(
322324
"""schema_registry_client must be provided if "schema_str" is a Schema instance with references""")
325+
elif schema_str is None:
326+
if schema_registry_client is None:
327+
raise ValueError(
328+
"""schema_registry_client must be provided if "schema_str" is not provided"""
329+
)
330+
schema = schema_str
323331
else:
324332
raise TypeError('You must pass either str or Schema')
325333

326-
self._parsed_schema = json.loads(schema.schema_str)
334+
self._parsed_schema = json.loads(schema.schema_str) if schema else None
327335
self._schema = schema
328336
self._registry = schema_registry_client
329337

@@ -378,7 +386,13 @@ def __call__(self, data, ctx):
378386
self._parsed_schema,
379387
store=named_schemas))
380388
else:
381-
validate(instance=obj_dict, schema=self._parsed_schema)
389+
if self._parsed_schema is None:
390+
schema = self._registry.get_schema(schema_id)
391+
# TODO: cache the parsed schemas too?
392+
parsed_schema = json.loads(schema.schema_str)
393+
else:
394+
parsed_schema = self._parsed_schema
395+
validate(instance=obj_dict, schema=parsed_schema)
382396
except ValidationError as ve:
383397
raise SerializationError(ve.message)
384398

0 commit comments

Comments
 (0)