Skip to content

Commit 4b73b46

Browse files
lazebnyioctavia-squidington-iii
andauthored
feat(cdk): add schema filter to DynamicSchemaLoader (#550)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent ee1fa46 commit 4b73b46

File tree

5 files changed

+44
-6
lines changed

5 files changed

+44
-6
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,6 +2318,12 @@ definitions:
23182318
- "$ref": "#/definitions/AsyncRetriever"
23192319
- "$ref": "#/definitions/CustomRetriever"
23202320
- "$ref": "#/definitions/SimpleRetriever"
2321+
schema_filter:
2322+
title: Schema Filter
2323+
description: Responsible for filtering fields to be added to json schema.
2324+
anyOf:
2325+
- "$ref": "#/definitions/RecordFilter"
2326+
- "$ref": "#/definitions/CustomRecordFilter"
23212327
schema_transformations:
23222328
title: Schema Transformations
23232329
description: A list of transformations to be applied to the schema.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
31
# generated by datamodel-codegen:
42
# filename: declarative_component_schema.yaml
53

@@ -2582,6 +2580,11 @@ class DynamicSchemaLoader(BaseModel):
25822580
description="Component used to coordinate how records are extracted across stream slices and request pages.",
25832581
title="Retriever",
25842582
)
2583+
schema_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
2584+
None,
2585+
description="Responsible for filtering fields to be added to json schema.",
2586+
title="Schema Filter",
2587+
)
25852588
schema_transformations: Optional[
25862589
List[
25872590
Union[

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2544,10 +2544,19 @@ def create_dynamic_schema_loader(
25442544
schema_type_identifier = self._create_component_from_model(
25452545
model.schema_type_identifier, config=config, parameters=model.parameters or {}
25462546
)
2547+
schema_filter = (
2548+
self._create_component_from_model(
2549+
model.schema_filter, config=config, parameters=model.parameters or {}
2550+
)
2551+
if model.schema_filter is not None
2552+
else None
2553+
)
2554+
25472555
return DynamicSchemaLoader(
25482556
retriever=retriever,
25492557
config=config,
25502558
schema_transformations=schema_transformations,
2559+
schema_filter=schema_filter,
25512560
schema_type_identifier=schema_type_identifier,
25522561
parameters=model.parameters or {},
25532562
)

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import dpath
1111
from typing_extensions import deprecated
1212

13+
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
1314
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1415
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1516
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
@@ -126,6 +127,7 @@ class DynamicSchemaLoader(SchemaLoader):
126127
parameters: InitVar[Mapping[str, Any]]
127128
schema_type_identifier: SchemaTypeIdentifier
128129
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
130+
schema_filter: Optional[RecordFilter] = None
129131

130132
def get_json_schema(self) -> Mapping[str, Any]:
131133
"""
@@ -151,20 +153,18 @@ def get_json_schema(self) -> Mapping[str, Any]:
151153
)
152154
properties[key] = value
153155

154-
transformed_properties = self._transform(properties, {})
156+
filtered_transformed_properties = self._transform(self._filter(properties))
155157

156158
return {
157159
"$schema": "https://json-schema.org/draft-07/schema#",
158160
"type": "object",
159161
"additionalProperties": True,
160-
"properties": transformed_properties,
162+
"properties": filtered_transformed_properties,
161163
}
162164

163165
def _transform(
164166
self,
165167
properties: Mapping[str, Any],
166-
stream_state: StreamState,
167-
stream_slice: Optional[StreamSlice] = None,
168168
) -> Mapping[str, Any]:
169169
for transformation in self.schema_transformations:
170170
transformation.transform(
@@ -173,6 +173,21 @@ def _transform(
173173
)
174174
return properties
175175

176+
def _filter(
177+
self,
178+
properties: Mapping[str, Any],
179+
) -> Mapping[str, Any]:
180+
if not self.schema_filter:
181+
return properties
182+
183+
filtered_properties: MutableMapping[str, Any] = {}
184+
for item in self.schema_filter.filter_records(
185+
({k: v} for k, v in properties.items()),
186+
{},
187+
):
188+
filtered_properties.update(item)
189+
return filtered_properties
190+
176191
def _get_key(
177192
self,
178193
raw_schema: MutableMapping[str, Any],

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
},
6969
"paginator": {"type": "NoPagination"},
7070
},
71+
"schema_filter": {
72+
"type": "RecordFilter",
73+
"condition": "{{ 'filtered_field' not in record }}",
74+
},
7175
"schema_transformations": [
7276
{
7377
"type": "AddFields",
@@ -390,6 +394,7 @@ def test_dynamic_schema_loader_with_type_conditions():
390394
body=json.dumps(
391395
{
392396
"fields": [
397+
{"name": "filtered_field", "type": "string"},
393398
{"name": "Id", "type": "integer"},
394399
{"name": "FirstName", "type": "string"},
395400
{"name": "Description", "type": "singleLineText"},

0 commit comments

Comments
 (0)