Skip to content

Commit 227325f

Browse files
committed
Update after review
1 parent ffee00f commit 227325f

File tree

7 files changed

+93
-118
lines changed

7 files changed

+93
-118
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,8 +1663,8 @@ definitions:
16631663
$parameters:
16641664
type: object
16651665
additionalProperties: true
1666-
TypesPair:
1667-
title: Types Pair
1666+
TypesMap:
1667+
title: Types Map
16681668
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
16691669
type: object
16701670
required:
@@ -1696,37 +1696,33 @@ definitions:
16961696
enum: [SchemaTypeIdentifier]
16971697
schema_pointer:
16981698
title: Schema Path
1699-
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
1699+
description: List of nested fields defining the schema field path to extract. Defaults to [].
17001700
type: array
1701+
default: []
17011702
items:
17021703
- type: string
17031704
interpolation_content:
17041705
- config
17051706
key_pointer:
17061707
title: Key Path
1707-
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
1708+
description: List of potentially nested fields describing the full path of the field key to extract.
17081709
type: array
17091710
items:
17101711
- type: string
17111712
interpolation_content:
17121713
- config
17131714
type_pointer:
17141715
title: Type Path
1715-
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
1716+
description: List of potentially nested fields describing the full path of the field type to extract.
17161717
type: array
17171718
items:
17181719
- type: string
17191720
interpolation_content:
17201721
- config
1721-
is_nullable:
1722-
title: Is Nullable
1723-
description: Add null to defined field type. This field is automatically set by the CDK.
1724-
type: boolean
1725-
default: true
1726-
types_map:
1722+
types_mapping:
17271723
type: array
17281724
items:
1729-
- "$ref": "#/definitions/TypesPair"
1725+
- "$ref": "#/definitions/TypesMap"
17301726
$parameters:
17311727
type: object
17321728
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ class HttpResponseFilter(BaseModel):
645645
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
646646

647647

648-
class TypesPair(BaseModel):
648+
class TypesMap(BaseModel):
649649
target_type: Union[str, List[str]]
650650
current_type: Union[str, List[str]]
651651

@@ -654,25 +654,20 @@ class SchemaTypeIdentifier(BaseModel):
654654
type: Optional[Literal["SchemaTypeIdentifier"]] = None
655655
schema_pointer: List[str] = Field(
656656
...,
657-
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
657+
description="List of nested fields defining the schema field path to extract. Defaults to [].",
658658
title="Schema Path",
659659
)
660660
key_pointer: List[str] = Field(
661661
...,
662-
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
662+
description="List of potentially nested fields describing the full path of the field key to extract.",
663663
title="Key Path",
664664
)
665665
type_pointer: Optional[List[str]] = Field(
666666
None,
667-
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
667+
description="List of potentially nested fields describing the full path of the field type to extract.",
668668
title="Type Path",
669669
)
670-
is_nullable: Optional[bool] = Field(
671-
True,
672-
description="Add null to defined field type. This field is automatically set by the CDK.",
673-
title="Is Nullable",
674-
)
675-
types_map: Optional[List[TypesPair]] = None
670+
types_mapping: Optional[List[TypesMap]] = None
676671
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
677672

678673

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
# DynamicSchemaLoader
6262
"DynamicSchemaLoader.retriever": "SimpleRetriever",
6363
# SchemaTypeIdentifier
64-
"SchemaTypeIdentifier.types_map": "TypesPair",
64+
"SchemaTypeIdentifier.types_map": "TypesMap",
6565
}
6666

6767
# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@
290290
SubstreamPartitionRouter as SubstreamPartitionRouterModel,
291291
)
292292
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
293-
TypesPair as TypesPairModel,
293+
TypesMap as TypesMapModel,
294294
)
295295
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
296296
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
@@ -357,7 +357,7 @@
357357
InlineSchemaLoader,
358358
JsonFileSchemaLoader,
359359
SchemaTypeIdentifier,
360-
TypesPair,
360+
TypesMap,
361361
)
362362
from airbyte_cdk.sources.declarative.spec import Spec
363363
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
@@ -454,7 +454,7 @@ def _init_mappings(self) -> None:
454454
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
455455
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
456456
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
457-
TypesPairModel: self.create_types_pair,
457+
TypesMapModel: self.create_types_map,
458458
JwtAuthenticatorModel: self.create_jwt_authenticator,
459459
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
460460
ListPartitionRouterModel: self.create_list_partition_router,
@@ -1568,18 +1568,18 @@ def create_inline_schema_loader(
15681568
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})
15691569

15701570
@staticmethod
1571-
def create_types_pair(model: TypesPairModel, **kwargs: Any) -> TypesPair:
1572-
return TypesPair(target_type=model.target_type, current_type=model.current_type)
1571+
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
1572+
return TypesMap(target_type=model.target_type, current_type=model.current_type)
15731573

15741574
def create_schema_type_identifier(
15751575
self, model: SchemaTypeIdentifierModel, config: Config, **kwargs: Any
15761576
) -> SchemaTypeIdentifier:
15771577
types_map = []
1578-
if model.types_map:
1578+
if model.types_mapping:
15791579
types_map.extend(
15801580
[
15811581
self._create_component_from_model(types_pair, config=config)
1582-
for types_pair in model.types_map
1582+
for types_pair in model.types_mapping
15831583
]
15841584
)
15851585
model_schema_pointer: List[Union[InterpolatedString, str]] = [
@@ -1590,14 +1590,11 @@ def create_schema_type_identifier(
15901590
[x for x in model.type_pointer] if model.type_pointer else None
15911591
)
15921592

1593-
assert model.is_nullable is not None # for mypy
1594-
15951593
return SchemaTypeIdentifier(
15961594
schema_pointer=model_schema_pointer,
15971595
key_pointer=model_key_pointer,
15981596
type_pointer=model_type_pointer,
15991597
types_map=types_map,
1600-
is_nullable=model.is_nullable,
16011598
parameters=model.parameters or {},
16021599
)
16031600

airbyte_cdk/sources/declarative/schema/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@
66
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
77
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
88
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
9-
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import DynamicSchemaLoader, TypesPair, SchemaTypeIdentifier
9+
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import DynamicSchemaLoader, TypesMap, SchemaTypeIdentifier
1010

11-
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader", "DynamicSchemaLoader", "TypesPair", "SchemaTypeIdentifier"]
11+
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader", "DynamicSchemaLoader", "TypesMap", "SchemaTypeIdentifier"]

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

Lines changed: 33 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,36 @@
1616
from airbyte_cdk.sources.source import ExperimentalClassWarning
1717
from airbyte_cdk.sources.types import Config
1818

19-
AIRBYTE_DATA_TYPES = {
20-
"string": {"type": "string"},
21-
"boolean": {"type": "boolean"},
22-
"date": {"type": "string", "format": "date"},
19+
AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = {
20+
"string": {"type": ["null", "string"]},
21+
"boolean": {"type": ["null", "boolean"]},
22+
"date": {"type": ["null", "string"], "format": "date"},
2323
"timestamp_without_timezone": {
24-
"type": "string",
24+
"type": ["null", "string"],
2525
"format": "date-time",
2626
"airbyte_type": "timestamp_without_timezone",
2727
},
28-
"timestamp_with_timezone": {"type": "string", "format": "date-time"},
28+
"timestamp_with_timezone": {"type": ["null", "string"], "format": "date-time"},
2929
"time_without_timezone": {
30-
"type": "string",
30+
"type": ["null", "string"],
3131
"format": "time",
3232
"airbyte_type": "time_without_timezone",
3333
},
3434
"time_with_timezone": {
35-
"type": "string",
35+
"type": ["null", "string"],
3636
"format": "time",
3737
"airbyte_type": "time_with_timezone",
3838
},
39-
"integer": {"type": "integer"},
40-
"number": {"type": "number"},
41-
"array": {"type": "array"},
42-
"object": {"type": "object"},
39+
"integer": {"type": ["null", "integer"]},
40+
"number": {"type": ["null", "number"]},
41+
"array": {"type": ["null", "array"]},
42+
"object": {"type": ["null", "object"]},
4343
}
4444

4545

4646
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
4747
@dataclass(frozen=True)
48-
class TypesPair:
48+
class TypesMap:
4949
"""
5050
Represents a mapping between a current type and its corresponding target type.
5151
"""
@@ -65,8 +65,7 @@ class SchemaTypeIdentifier:
6565
key_pointer: List[Union[InterpolatedString, str]]
6666
parameters: InitVar[Mapping[str, Any]]
6767
type_pointer: Optional[List[Union[InterpolatedString, str]]] = None
68-
types_map: Optional[List[TypesPair]] = None
69-
is_nullable: bool = True
68+
types_map: Optional[List[TypesMap]] = None
7069

7170
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
7271
self.schema_pointer = self._update_pointer(self.schema_pointer, parameters) # type: ignore[assignment] # This is reqired field in model
@@ -108,19 +107,24 @@ def get_json_schema(self) -> Mapping[str, Any]:
108107
Constructs a JSON Schema based on retrieved data.
109108
"""
110109
properties = {}
111-
for retrieved_record in self.retriever.read_records({}):
112-
raw_schema = self._extract_data(
110+
retrieved_record = next(self.retriever.read_records({}), None) # type: ignore[call-overload] # read_records return Iterable data type
111+
112+
raw_schema = (
113+
self._extract_data(
113114
retrieved_record, # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any]
114115
self.schema_type_identifier.schema_pointer,
115116
)
116-
for property_definition in raw_schema:
117-
key = self._get_key(property_definition, self.schema_type_identifier.key_pointer)
118-
value = self._get_type(
119-
property_definition,
120-
self.schema_type_identifier.type_pointer,
121-
is_nullable=self.schema_type_identifier.is_nullable,
122-
)
123-
properties[key] = value
117+
if retrieved_record
118+
else []
119+
)
120+
121+
for property_definition in raw_schema:
122+
key = self._get_key(property_definition, self.schema_type_identifier.key_pointer)
123+
value = self._get_type(
124+
property_definition,
125+
self.schema_type_identifier.type_pointer,
126+
)
127+
properties[key] = value
124128

125129
return {
126130
"$schema": "http://json-schema.org/draft-07/schema#",
@@ -145,7 +149,6 @@ def _get_type(
145149
self,
146150
raw_schema: MutableMapping[str, Any],
147151
field_type_path: Optional[List[Union[InterpolatedString, str]]],
148-
is_nullable: bool = True,
149152
) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
150153
"""
151154
Determines the JSON Schema type for a field, supporting nullable and combined types.
@@ -161,15 +164,11 @@ def _get_type(
161164
and len(mapped_field_type) == 2
162165
and all(isinstance(item, str) for item in mapped_field_type)
163166
):
164-
first_type = self._make_field_nullable(
165-
self._get_airbyte_type(mapped_field_type[0]), is_nullable
166-
)
167-
second_type = self._make_field_nullable(
168-
self._get_airbyte_type(mapped_field_type[1]), is_nullable
169-
)
167+
first_type = self._get_airbyte_type(mapped_field_type[0])
168+
second_type = self._get_airbyte_type(mapped_field_type[1])
170169
return {"oneOf": [first_type, second_type]}
171170
elif isinstance(mapped_field_type, str):
172-
return self._make_field_nullable(self._get_airbyte_type(mapped_field_type), is_nullable)
171+
return self._get_airbyte_type(mapped_field_type)
173172
else:
174173
raise ValueError(
175174
f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
@@ -187,18 +186,6 @@ def _replace_type_if_not_valid(
187186
return types_pair.target_type
188187
return field_type
189188

190-
@staticmethod
191-
def _make_field_nullable(
192-
field_type: Mapping[str, Any], is_nullable: bool = True
193-
) -> Mapping[str, Any]:
194-
"""
195-
Wraps a field type to allow null values if `is_nullable` is True.
196-
"""
197-
updated_field_type = dict(deepcopy(field_type))
198-
if is_nullable:
199-
updated_field_type["type"] = ["null", updated_field_type["type"]]
200-
return updated_field_type
201-
202189
@staticmethod
203190
def _get_airbyte_type(field_type: str) -> Mapping[str, Any]:
204191
"""
@@ -227,9 +214,4 @@ def _extract_data(
227214
for path in extraction_path
228215
]
229216

230-
if "*" in path:
231-
extracted = dpath.values(body, path) # type: ignore # extracted will be a MutableMapping, given input data structure
232-
else:
233-
extracted = dpath.get(body, path, default=default) # type: ignore # extracted will be a MutableMapping, given input data structure
234-
235-
return extracted
217+
return dpath.get(body, path, default=default) # type: ignore # extracted will be a MutableMapping, given input data structure

0 commit comments

Comments
 (0)