Skip to content

Commit ddfde29

Browse files
author
Oleksandr Bazarnov
committed
add
1 parent bf998bd commit ddfde29

File tree

7 files changed

+248
-5
lines changed

7 files changed

+248
-5
lines changed

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,16 @@ def run_test_read(
112112
record_limit = self._check_record_limit(record_limit)
113113
# The connector builder currently only supports reading from a single stream at a time
114114
stream = source.streams(config)[0]
115+
116+
# get any deprecation warnings during the component creation
117+
deprecation_warnings: List[AirbyteLogMessage] = source.deprecation_warnings()
118+
115119
schema_inferrer = SchemaInferrer(
116120
self._pk_to_nested_and_composite_field(stream.primary_key),
117121
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
118122
)
119123
datetime_format_inferrer = DatetimeFormatInferrer()
124+
120125
message_group = get_message_groups(
121126
self._read_stream(source, config, configured_catalog, state),
122127
schema_inferrer,
@@ -125,7 +130,7 @@ def run_test_read(
125130
)
126131

127132
slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
128-
message_group
133+
message_group, deprecation_warnings
129134
)
130135
schema, log_messages = self._get_infered_schema(
131136
configured_catalog, schema_inferrer, log_messages
@@ -238,7 +243,11 @@ def _check_record_limit(self, record_limit: Optional[int] = None) -> int:
238243

239244
return record_limit
240245

241-
def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES:
246+
def _categorise_groups(
247+
self,
248+
message_groups: MESSAGE_GROUPS,
249+
deprecation_warnings: Optional[List[Any]] = None,
250+
) -> GROUPED_MESSAGES:
242251
"""
243252
Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update.
244253
@@ -269,6 +278,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
269278
auxiliary_requests = []
270279
latest_config_update: Optional[AirbyteControlMessage] = None
271280

281+
# process the message groups first
272282
for message_group in message_groups:
273283
match message_group:
274284
case AirbyteLogMessage():
@@ -298,6 +308,17 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
298308
case _:
299309
raise ValueError(f"Unknown message group type: {type(message_group)}")
300310

311+
# process deprecation warnings, if present
312+
if deprecation_warnings is not None:
313+
for deprecation in deprecation_warnings:
314+
match deprecation:
315+
case AirbyteLogMessage():
316+
log_messages.append(
317+
LogMessage(message=deprecation.message, level=deprecation.level.value)
318+
)
319+
case _:
320+
raise ValueError(f"Unknown message group type: {type(deprecation)}")
321+
301322
return slices, log_messages, auxiliary_requests, latest_config_update
302323

303324
def _get_infered_schema(

airbyte_cdk/sources/declarative/declarative_source.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
import logging
66
from abc import abstractmethod
7-
from typing import Any, Mapping, Tuple
7+
from typing import Any, List, Mapping, Tuple
88

9+
from airbyte_cdk.models import (
10+
AirbyteLogMessage,
11+
)
912
from airbyte_cdk.sources.abstract_source import AbstractSource
1013
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1114

@@ -34,3 +37,9 @@ def check_connection(
3437
The error object will be cast to string to display the problem to the user.
3538
"""
3639
return self.connection_checker.check_connection(self, logger, config)
40+
41+
def deprecation_warnings(self) -> List[AirbyteLogMessage]:
42+
"""
43+
Returns a list of deprecation warnings for the source.
44+
"""
45+
return []

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from airbyte_cdk.models import (
1919
AirbyteConnectionStatus,
20+
AirbyteLogMessage,
2021
AirbyteMessage,
2122
AirbyteStateMessage,
2223
ConfiguredAirbyteCatalog,
@@ -123,6 +124,9 @@ def dynamic_streams(self) -> List[Dict[str, Any]]:
123124
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
124125
)
125126

127+
def deprecation_warnings(self) -> List[AirbyteLogMessage]:
128+
return self._constructor.get_model_deprecations() or []
129+
126130
@property
127131
def connection_checker(self) -> ConnectionChecker:
128132
check = self._source_config["check"]
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
# THIS IS A STATIC CLASS MODEL USED TO DISPLAY DEPRECATION WARNINGS
4+
# WHEN DEPRECATED FIELDS ARE ACCESSED
5+
6+
import warnings
7+
from typing import Any, List
8+
9+
from pydantic.v1 import BaseModel
10+
11+
from airbyte_cdk.models import (
12+
AirbyteLogMessage,
13+
Level,
14+
)
15+
16+
# format the warning message
17+
warnings.formatwarning = (
18+
lambda message, category, *args, **kwargs: f"{category.__name__}: {message}"
19+
)
20+
21+
FIELDS_TAG = "__fields__"
22+
DEPRECATED = "deprecated"
23+
DEPRECATION_MESSAGE = "deprecation_message"
24+
DEPRECATION_LOGS_TAG = "_deprecation_logs"
25+
26+
27+
class BaseModelWithDeprecations(BaseModel):
28+
"""
29+
Pydantic BaseModel that warns when deprecated fields are accessed.
30+
The deprecation message is stored in the field's extra attributes.
31+
This class is used to create models that can have deprecated fields
32+
and show warnings when those fields are accessed or initialized.
33+
34+
The `_deprecation_logs` attribute is storred in the model itself.
35+
The collected deprecation warnings are further proparated to the Airbyte log messages,
36+
during the component creation process, in `model_to_component._collect_model_deprecations()`.
37+
38+
The component implementation is not responsible for handling the deprecation warnings,
39+
since the deprecation warnings are already handled in the model itself.
40+
"""
41+
42+
class Config:
43+
"""
44+
Allow extra fields in the model. In case the model restricts extra fields.
45+
"""
46+
47+
extra = "allow"
48+
49+
_deprecation_logs: List[AirbyteLogMessage] = []
50+
51+
def __init__(self, **data: Any) -> None:
52+
"""
53+
Show warnings for deprecated fields during component initialization.
54+
"""
55+
model_fields = self.__fields__
56+
57+
for field_name in data:
58+
if field_name in model_fields:
59+
is_deprecated_field = model_fields[field_name].field_info.extra.get(
60+
DEPRECATED, False
61+
)
62+
if is_deprecated_field:
63+
deprecation_message = model_fields[field_name].field_info.extra.get(
64+
DEPRECATION_MESSAGE, ""
65+
)
66+
self._deprecated_warning(field_name, deprecation_message)
67+
68+
# Call the parent constructor
69+
super().__init__(**data)
70+
71+
def __getattribute__(self, name: str) -> Any:
72+
"""
73+
Show warnings for deprecated fields during field usage.
74+
"""
75+
76+
value = super().__getattribute__(name)
77+
78+
try:
79+
model_fields = super().__getattribute__(FIELDS_TAG)
80+
field_info = model_fields.get(name)
81+
is_deprecated_field = (
82+
field_info.field_info.extra.get(DEPRECATED, False) if field_info else False
83+
)
84+
if is_deprecated_field:
85+
deprecation_message = field_info.extra.get(DEPRECATION_MESSAGE, "")
86+
self._deprecated_warning(name, deprecation_message)
87+
except (AttributeError, KeyError):
88+
pass
89+
90+
return value
91+
92+
def _deprecated_warning(self, field_name: str, message: str) -> None:
93+
"""
94+
Show a warning message for deprecated fields (to stdout).
95+
Args:
96+
field_name (str): Name of the deprecated field.
97+
message (str): Warning message to be displayed.
98+
"""
99+
100+
# Emit a warning message for deprecated fields (to stdout) (Python Default behavior)
101+
warnings.warn(
102+
f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}",
103+
DeprecationWarning,
104+
)
105+
106+
# Add the deprecation message to the Airbyte log messages,
107+
# this logs are displayed in the Connector Builder.
108+
self._deprecation_logs.append(
109+
AirbyteLogMessage(
110+
level=Level.WARN,
111+
message=f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}",
112+
),
113+
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from isodate import parse_duration
2828
from pydantic.v1 import BaseModel
2929

30-
from airbyte_cdk.models import FailureType, Level
30+
from airbyte_cdk.models import AirbyteLogMessage, FailureType, Level
3131
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
3232
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
3333
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
@@ -108,6 +108,10 @@
108108
CustomStateMigration,
109109
GzipDecoder,
110110
)
111+
from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
112+
DEPRECATION_LOGS_TAG,
113+
BaseModelWithDeprecations,
114+
)
111115
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
112116
AddedFieldDefinition as AddedFieldDefinitionModel,
113117
)
@@ -584,6 +588,8 @@ def __init__(
584588
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
585589
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
586590
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
591+
# placeholder for deprecation warnings
592+
self._deprecation_logs: List[AirbyteLogMessage] = []
587593

588594
def _init_mappings(self) -> None:
589595
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
@@ -730,8 +736,26 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg
730736
component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__)
731737
if not component_constructor:
732738
raise ValueError(f"Could not find constructor for {model.__class__}")
739+
740+
# collect deprecation warnings for supported models.
741+
if isinstance(model, BaseModelWithDeprecations):
742+
self._collect_model_deprecations(model)
743+
733744
return component_constructor(model=model, config=config, **kwargs)
734745

746+
def get_model_deprecations(self) -> List[Any]:
747+
"""
748+
Returns the deprecation warnings that were collected during the creation of components.
749+
"""
750+
return self._deprecation_logs
751+
752+
def _collect_model_deprecations(self, model: BaseModelWithDeprecations) -> None:
753+
if hasattr(model, DEPRECATION_LOGS_TAG) and model._deprecation_logs is not None:
754+
for log in model._deprecation_logs:
755+
# avoid duplicates for deprecation logs observed.
756+
if log not in self._deprecation_logs:
757+
self._deprecation_logs.append(log)
758+
735759
@staticmethod
736760
def create_added_field_definition(
737761
model: AddedFieldDefinitionModel, config: Config, **kwargs: Any

bin/generate_component_manifest_files.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

3+
import re
34
import sys
45
from glob import glob
56
from pathlib import Path
@@ -28,6 +29,63 @@ def generate_init_module_content() -> str:
2829
return header
2930

3031

32+
def replace_base_model_for_classes_with_deprecated_fields(post_processed_content: str) -> str:
33+
"""
34+
Replace the base model for classes with deprecated fields.
35+
This function looks for classes that inherit from `BaseModel` and have fields marked as deprecated.
36+
It replaces the base model with `BaseModelWithDeprecations` for those classes.
37+
"""
38+
39+
# Find classes with deprecated fields
40+
classes_with_deprecated_fields = set()
41+
class_matches = re.finditer(r"class (\w+)\(BaseModel\):", post_processed_content)
42+
43+
for class_match in class_matches:
44+
class_name = class_match.group(1)
45+
class_start = class_match.start()
46+
# Find the next class definition or end of file
47+
next_class_match = re.search(
48+
r"class \w+\(",
49+
post_processed_content[class_start + len(class_match.group(0)) :],
50+
)
51+
class_end = (
52+
len(post_processed_content)
53+
if next_class_match is None
54+
else class_start + len(class_match.group(0)) + next_class_match.start()
55+
)
56+
class_content = post_processed_content[class_start:class_end]
57+
58+
# Check if any field has deprecated=True
59+
if re.search(r"deprecated\s*=\s*True", class_content):
60+
classes_with_deprecated_fields.add(class_name)
61+
62+
# update the imports to include the new base model with deprecation warinings
63+
# only if there are classes with the fields marked as deprecated.
64+
if len(classes_with_deprecated_fields) > 0:
65+
# Find where to insert the base model - after imports but before class definitions
66+
imports_end = post_processed_content.find(
67+
"\n\n",
68+
post_processed_content.find("from pydantic.v1 import"),
69+
)
70+
if imports_end > 0:
71+
post_processed_content = (
72+
post_processed_content[:imports_end]
73+
+ "\n\n"
74+
+ "from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (\n"
75+
+ " BaseModelWithDeprecations,\n"
76+
+ ")"
77+
+ post_processed_content[imports_end:]
78+
)
79+
80+
# Use the `BaseModelWithDeprecations` base model for the classes with deprecated fields
81+
for class_name in classes_with_deprecated_fields:
82+
pattern = rf"class {class_name}\(BaseModel\):"
83+
replacement = f"class {class_name}(BaseModelWithDeprecations):"
84+
post_processed_content = re.sub(pattern, replacement, post_processed_content)
85+
86+
return post_processed_content
87+
88+
3189
async def post_process_codegen(codegen_container: dagger.Container):
3290
codegen_container = codegen_container.with_exec(
3391
["mkdir", "/generated_post_processed"], use_entrypoint=True
@@ -41,6 +99,11 @@ async def post_process_codegen(codegen_container: dagger.Container):
4199
post_processed_content = original_content.replace(
42100
" _parameters:", " parameters:"
43101
).replace("from pydantic", "from pydantic.v1")
102+
103+
post_processed_content = replace_base_model_for_classes_with_deprecated_fields(
104+
post_processed_content
105+
)
106+
44107
codegen_container = codegen_container.with_new_file(
45108
f"/generated_post_processed/{generated_file}", contents=post_processed_content
46109
)
@@ -75,6 +138,12 @@ async def main():
75138
"--set-default-enum-member",
76139
"--use-double-quotes",
77140
"--remove-special-field-name-prefix",
141+
# allow usage of the extra key such as `deprecated`, etc.
142+
"--field-extra-keys",
143+
# account the `deprecated` flag provided for the field.
144+
"deprecated",
145+
# account the `deprecation_message` provided for the field.
146+
"deprecation_message",
78147
],
79148
use_entrypoint=True,
80149
)

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import json
88
import logging
99
import os
10-
from typing import Literal
10+
from typing import List, Literal
1111
from unittest import mock
1212
from unittest.mock import MagicMock, patch
1313

@@ -818,6 +818,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
818818
connector_specification.connectionSpecification = {}
819819
return connector_specification
820820

821+
def deprecation_warnings(self) -> List[AirbyteLogMessage]:
822+
return []
823+
821824
@property
822825
def check_config_against_spec(self) -> Literal[False]:
823826
return False

0 commit comments

Comments
 (0)