Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1548,13 +1548,15 @@ definitions:
loaders defined first taking precedence in the event of a conflict.
anyOf:
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/InferredSchemaLoader"
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- title: Multiple Schema Loaders
type: array
items:
anyOf:
- "$ref": "#/definitions/InlineSchemaLoader"
- "$ref": "#/definitions/InferredSchemaLoader"
- "$ref": "#/definitions/DynamicSchemaLoader"
- "$ref": "#/definitions/JsonFileSchemaLoader"
- "$ref": "#/definitions/CustomSchemaLoader"
Expand Down Expand Up @@ -2462,6 +2464,40 @@ definitions:
$parameters:
type: object
additionalProperties: true
InferredSchemaLoader:
title: Inferred Schema Loader
description: Infers a JSON Schema by reading a sample of records from the stream at discover time. This is useful for streams where the schema is not known in advance or changes dynamically.
type: object
required:
- type
- retriever
properties:
type:
type: string
enum: [InferredSchemaLoader]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/SimpleRetriever"
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
record_sample_size:
title: Record Sample Size
description: The maximum number of records to read for schema inference. Defaults to 100.
type: integer
default: 100
example:
- 100
- 500
- 1000
stream_name:
title: Stream Name
description: The name of the stream for which to infer the schema. If not provided, it will be inferred from the stream context.
type: string
$parameters:
type: object
additionalProperties: true
InlineSchemaLoader:
title: Inline Schema Loader
description: Loads a schema that is defined directly in the manifest file.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2483,11 +2481,13 @@ class Config:
schema_loader: Optional[
Union[
InlineSchemaLoader,
InferredSchemaLoader,
DynamicSchemaLoader,
JsonFileSchemaLoader,
List[
Union[
InlineSchemaLoader,
InferredSchemaLoader,
DynamicSchemaLoader,
JsonFileSchemaLoader,
CustomSchemaLoader,
Expand Down Expand Up @@ -2753,6 +2753,27 @@ class DynamicSchemaLoader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class InferredSchemaLoader(BaseModel):
type: Literal["InferredSchemaLoader"]
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
record_sample_size: Optional[int] = Field(
100,
description="The maximum number of records to read for schema inference. Defaults to 100.",
example=[100, 500, 1000],
title="Record Sample Size",
)
stream_name: Optional[str] = Field(
None,
description="The name of the stream for which to infer the schema. If not provided, it will be inferred from the stream context.",
title="Stream Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ParentStreamConfig(BaseModel):
type: Literal["ParentStreamConfig"]
stream: Union[DeclarativeStream, StateDelegatingStream] = Field(
Expand Down Expand Up @@ -3093,6 +3114,7 @@ class DynamicDeclarativeStream(BaseModel):
SessionTokenAuthenticator.update_forward_refs()
HttpRequester.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
InferredSchemaLoader.update_forward_refs()
ParentStreamConfig.update_forward_refs()
PropertiesFromEndpoint.update_forward_refs()
SimpleRetriever.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
IncrementingCountCursor as IncrementingCountCursorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InferredSchemaLoader as InferredSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
InlineSchemaLoader as InlineSchemaLoaderModel,
)
Expand Down Expand Up @@ -549,6 +552,7 @@
ComplexFieldType,
DefaultSchemaLoader,
DynamicSchemaLoader,
InferredSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
SchemaLoader,
Expand Down Expand Up @@ -748,6 +752,7 @@ def _init_mappings(self) -> None:
HttpRequesterModel: self.create_http_requester,
HttpResponseFilterModel: self.create_http_response_filter,
InlineSchemaLoaderModel: self.create_inline_schema_loader,
InferredSchemaLoaderModel: self.create_inferred_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
Expand Down Expand Up @@ -2500,6 +2505,39 @@ def create_inline_schema_loader(
) -> InlineSchemaLoader:
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})

def create_inferred_schema_loader(
self, model: InferredSchemaLoaderModel, config: Config, **kwargs: Any
) -> InferredSchemaLoader:
name = kwargs.get("name", "inferred_schema")
retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name=name,
primary_key=None,
partition_router=self._build_stream_slicer_from_partition_router(
model.retriever, config
),
transformations=[],
use_cache=True,
log_formatter=(
lambda response: format_http_message(
response,
f"Schema loader '{name}' request",
f"Request performed in order to infer schema.",
name,
is_auxiliary=True,
)
),
)

return InferredSchemaLoader(
retriever=retriever,
config=config,
record_sample_size=model.record_sample_size or 100,
stream_name=model.stream_name or name,
parameters=model.parameters or {},
)

def create_complex_field_type(
self, model: ComplexFieldTypeModel, config: Config, **kwargs: Any
) -> ComplexFieldType:
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
SchemaTypeIdentifier,
TypesMap,
)
from airbyte_cdk.sources.declarative.schema.inferred_schema_loader import InferredSchemaLoader
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
Expand All @@ -18,6 +19,7 @@
"DefaultSchemaLoader",
"SchemaLoader",
"InlineSchemaLoader",
"InferredSchemaLoader",
"DynamicSchemaLoader",
"ComplexFieldType",
"TypesMap",
Expand Down
87 changes: 87 additions & 0 deletions airbyte_cdk/sources/declarative/schema/inferred_schema_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from collections.abc import Mapping as ABCMapping
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional

from airbyte_cdk.models import AirbyteRecordMessage
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer


@dataclass
class InferredSchemaLoader(SchemaLoader):
"""
Infers a JSON Schema by reading a sample of records from the stream at discover time.

This schema loader reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema based on the structure of those records.
This is useful for streams where the schema is not known in advance or changes dynamically.

Attributes:
retriever (Retriever): The retriever used to fetch records from the stream
config (Config): The user-provided configuration as specified by the source's spec
parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100.
stream_name (str): The name of the stream for which to infer the schema
"""

retriever: Retriever
config: Config
parameters: InitVar[Mapping[str, Any]]
record_sample_size: int = 100
stream_name: str = ""

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
if not self.stream_name:
self.stream_name = parameters.get("name", "")

def get_json_schema(self) -> Mapping[str, Any]:
"""
Infers and returns a JSON schema by reading a sample of records from the stream.

This method reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema. If no records are available,
it returns an empty schema.

Returns:
A mapping representing the inferred JSON schema for the stream
"""
schema_inferrer = SchemaInferrer()

record_count = 0
try:
for stream_slice in self.retriever.stream_slices():
for record in self.retriever.read_records(
records_schema={}, stream_slice=stream_slice
):
if record_count >= self.record_sample_size:
break

if isinstance(record, ABCMapping) and not isinstance(record, dict):
record = dict(record)

airbyte_record = AirbyteRecordMessage(
stream=self.stream_name,
data=record, # type: ignore[arg-type]
emitted_at=0,
)

schema_inferrer.accumulate(airbyte_record)
record_count += 1

if record_count >= self.record_sample_size:
break
except Exception:
return {}

inferred_schema: Optional[Mapping[str, Any]] = schema_inferrer.get_stream_schema(
self.stream_name
)

return inferred_schema if inferred_schema else {}
Loading
Loading