Skip to content

Commit 8a79834

Browse files
committed
Merge branch 'main' into brian/missing_stream_emit_incomplete_status
2 parents a0ece6b + 9a075a1 commit 8a79834

File tree

13 files changed

+591
-62
lines changed

13 files changed

+591
-62
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3797,7 +3797,6 @@ definitions:
37973797
- polling_requester
37983798
- download_requester
37993799
- status_extractor
3800-
- download_target_extractor
38013800
properties:
38023801
type:
38033802
type: string

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def __init__(
189189
# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
190190
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
191191
self._synced_some_data = False
192+
self._logged_regarding_datetime_format_error = False
192193

193194
@property
194195
def cursor_field(self) -> CursorField:
@@ -518,10 +519,23 @@ def observe(self, record: Record) -> None:
518519
except ValueError:
519520
return
520521

522+
try:
523+
record_cursor = self._connector_state_converter.output_format(
524+
self._connector_state_converter.parse_value(record_cursor_value)
525+
)
526+
except ValueError as exception:
527+
if not self._logged_regarding_datetime_format_error:
528+
logger.warning(
529+
"Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s",
530+
self._stream_name,
531+
self._cursor_field.cursor_field_key,
532+
record_cursor_value,
533+
exception,
534+
)
535+
self._logged_regarding_datetime_format_error = True
536+
return
537+
521538
self._synced_some_data = True
522-
record_cursor = self._connector_state_converter.output_format(
523-
self._connector_state_converter.parse_value(record_cursor_value)
524-
)
525539
self._update_global_cursor(record_cursor)
526540
if not self._use_global_cursor:
527541
self._cursor_per_partition[

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2852,8 +2852,8 @@ class AsyncRetriever(BaseModel):
28522852
status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
28532853
..., description="Responsible for fetching the actual status of the async job."
28542854
)
2855-
download_target_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
2856-
...,
2855+
download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field(
2856+
None,
28572857
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
28582858
)
28592859
download_extractor: Optional[

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1976,7 +1976,10 @@ def create_default_stream(
19761976
primary_key = model.primary_key.__root__ if model.primary_key else None
19771977

19781978
partition_router = self._build_stream_slicer_from_partition_router(
1979-
model.retriever, config, stream_name=model.name
1979+
model.retriever,
1980+
config,
1981+
stream_name=model.name,
1982+
**kwargs,
19801983
)
19811984
concurrent_cursor = self._build_concurrent_cursor(model, partition_router, config)
19821985
if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
@@ -2155,10 +2158,11 @@ def _build_stream_slicer_from_partition_router(
21552158
],
21562159
config: Config,
21572160
stream_name: Optional[str] = None,
2161+
**kwargs: Any,
21582162
) -> PartitionRouter:
21592163
if (
21602164
hasattr(model, "partition_router")
2161-
and isinstance(model, SimpleRetrieverModel | AsyncRetrieverModel)
2165+
and isinstance(model, (SimpleRetrieverModel, AsyncRetrieverModel, CustomRetrieverModel))
21622166
and model.partition_router
21632167
):
21642168
stream_slicer_model = model.partition_router
@@ -2172,6 +2176,23 @@ def _build_stream_slicer_from_partition_router(
21722176
],
21732177
parameters={},
21742178
)
2179+
elif isinstance(stream_slicer_model, dict):
2180+
# partition router comes from CustomRetrieverModel therefore has not been parsed as a model
2181+
params = stream_slicer_model.get("$parameters")
2182+
if not isinstance(params, dict):
2183+
params = {}
2184+
stream_slicer_model["$parameters"] = params
2185+
2186+
if stream_name is not None:
2187+
params["stream_name"] = stream_name
2188+
2189+
return self._create_nested_component( # type: ignore[no-any-return] # There is no guarantee that this will return a stream slicer. If not, we expect an AttributeError during the call to `stream_slices`
2190+
model,
2191+
"partition_router",
2192+
stream_slicer_model,
2193+
config,
2194+
**kwargs,
2195+
)
21752196
else:
21762197
return self._create_component_from_model( # type: ignore[no-any-return] # Will be created PartitionRouter as stream_slicer_model is model.partition_router
21772198
model=stream_slicer_model, config=config, stream_name=stream_name or ""
@@ -2886,7 +2907,7 @@ def create_page_increment(
28862907
)
28872908

28882909
def create_parent_stream_config(
2889-
self, model: ParentStreamConfigModel, config: Config, stream_name: str, **kwargs: Any
2910+
self, model: ParentStreamConfigModel, config: Config, *, stream_name: str, **kwargs: Any
28902911
) -> ParentStreamConfig:
28912912
declarative_stream = self._create_component_from_model(
28922913
model.stream,
@@ -3446,6 +3467,11 @@ def create_async_retriever(
34463467
transformations: List[RecordTransformation],
34473468
**kwargs: Any,
34483469
) -> AsyncRetriever:
3470+
if model.download_target_requester and not model.download_target_extractor:
3471+
raise ValueError(
3472+
f"`download_target_extractor` required if using a `download_target_requester`"
3473+
)
3474+
34493475
def _get_download_retriever(
34503476
requester: Requester, extractor: RecordExtractor, _decoder: Decoder
34513477
) -> SimpleRetriever:
@@ -3603,11 +3629,15 @@ def _get_job_timeout() -> datetime.timedelta:
36033629
status_extractor = self._create_component_from_model(
36043630
model=model.status_extractor, decoder=decoder, config=config, name=name
36053631
)
3606-
download_target_extractor = self._create_component_from_model(
3607-
model=model.download_target_extractor,
3608-
decoder=decoder,
3609-
config=config,
3610-
name=name,
3632+
download_target_extractor = (
3633+
self._create_component_from_model(
3634+
model=model.download_target_extractor,
3635+
decoder=decoder,
3636+
config=config,
3637+
name=name,
3638+
)
3639+
if model.download_target_extractor
3640+
else None
36113641
)
36123642

36133643
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
@@ -3693,14 +3723,19 @@ def create_spec(self, model: SpecModel, config: Config, **kwargs: Any) -> Spec:
36933723
)
36943724

36953725
def create_substream_partition_router(
3696-
self, model: SubstreamPartitionRouterModel, config: Config, **kwargs: Any
3726+
self,
3727+
model: SubstreamPartitionRouterModel,
3728+
config: Config,
3729+
*,
3730+
stream_name: str,
3731+
**kwargs: Any,
36973732
) -> SubstreamPartitionRouter:
36983733
parent_stream_configs = []
36993734
if model.parent_stream_configs:
37003735
parent_stream_configs.extend(
37013736
[
37023737
self.create_parent_stream_config_with_substream_wrapper(
3703-
model=parent_stream_config, config=config, **kwargs
3738+
model=parent_stream_config, config=config, stream_name=stream_name, **kwargs
37043739
)
37053740
for parent_stream_config in model.parent_stream_configs
37063741
]
@@ -3720,7 +3755,7 @@ def create_parent_stream_config_with_substream_wrapper(
37203755

37213756
# This flag will be used exclusively for StateDelegatingStream when a parent stream is created
37223757
has_parent_state = bool(
3723-
self._connector_state_manager.get_stream_state(kwargs.get("stream_name", ""), None)
3758+
self._connector_state_manager.get_stream_state(stream_name, None)
37243759
if model.incremental_dependency
37253760
else False
37263761
)
@@ -4113,11 +4148,17 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
41134148
)
41144149

41154150
def create_grouping_partition_router(
4116-
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
4151+
self,
4152+
model: GroupingPartitionRouterModel,
4153+
config: Config,
4154+
*,
4155+
stream_name: str,
4156+
**kwargs: Any,
41174157
) -> GroupingPartitionRouter:
41184158
underlying_router = self._create_component_from_model(
41194159
model=model.underlying_partition_router,
41204160
config=config,
4161+
stream_name=stream_name,
41214162
**kwargs,
41224163
)
41234164
if model.group_size < 1:

airbyte_cdk/sources/declarative/requesters/README.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
1+
# Download Target and Download Requester
2+
3+
- The `creation_response` and `polling_response` interpolation contexts are always available during the job download step of the process.
4+
5+
- The`download_target` interpolation context is generated by the `download_target_extractor` and made available to the job download step as well.
6+
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
7+
- if `download_target_requester` is provided, an additional request will be made to fetch job download targets and `download_target_extractor` will operate on that response
8+
9+
## Some important considerations
10+
11+
- **Note:** If the `download_target_extractor` and `download_target_requester` are not defined, a single job download request will be made without the `download_target` context.
12+
- **Note:** The `download_target_extractor` is required (not optional) if using a `download_target_requester`
13+
114
# AsyncHttpJobRepository sequence diagram
215

316
- Components marked as optional are not required and can be ignored.
4-
- if `download_target_requester` is not provided, `download_target_extractor` will get urls from the `polling_response`
5-
- interpolation_context, e.g. `creation_response` or `polling_response` can be obtained from stream_slice
617

718
```mermaid
819
---
@@ -37,7 +48,7 @@ sequenceDiagram
3748
UrlRequester -->> AsyncHttpJobRepository: Download URLs
3849
3950
AsyncHttpJobRepository ->> DownloadRetriever: Download reports
40-
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `url`)
51+
DownloadRetriever ->> Reporting Server: Retrieve report data (interpolation_context: `download_target`, `creation_response`, `polling_response`)
4152
Reporting Server -->> DownloadRetriever: Report data
4253
DownloadRetriever -->> AsyncHttpJobRepository: Report data
4354
else Status: Failed

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class AsyncHttpJobRepository(AsyncJobRepository):
4343
delete_requester: Optional[Requester]
4444
status_extractor: DpathExtractor
4545
status_mapping: Mapping[str, AsyncJobStatus]
46-
download_target_extractor: DpathExtractor
46+
download_target_extractor: Optional[DpathExtractor]
4747

4848
# timeout for the job to be completed, passed from `polling_job_timeout`
4949
job_timeout: Optional[timedelta] = None
@@ -213,14 +213,16 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]:
213213
214214
"""
215215

216-
for target_url in self._get_download_targets(job):
216+
for download_target in self._get_download_targets(job):
217217
job_slice = job.job_parameters()
218218
stream_slice = StreamSlice(
219219
partition=job_slice.partition,
220220
cursor_slice=job_slice.cursor_slice,
221221
extra_fields={
222222
**job_slice.extra_fields,
223-
"download_target": target_url,
223+
"download_target": download_target,
224+
"creation_response": self._get_creation_response_interpolation_context(job),
225+
"polling_response": self._get_polling_response_interpolation_context(job),
224226
},
225227
)
226228
for message in self.download_retriever.read_records({}, stream_slice):
@@ -330,9 +332,27 @@ def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
330332
)
331333

332334
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
333-
if not self.download_target_requester:
334-
url_response = self._polling_job_response_by_id[job.api_job_id()]
335-
else:
335+
"""Returns an iterable of strings to help target requests for downloading async jobs."""
336+
# If neither download_target_extractor nor download_target_requester are provided, yield a single empty string
337+
# to express the need to make a single download request without any download_target value
338+
if not self.download_target_extractor:
339+
if not self.download_target_requester:
340+
lazy_log(
341+
LOGGER,
342+
logging.DEBUG,
343+
lambda: "No download_target_extractor or download_target_requester provided. Will attempt a single download request without a `download_target`.",
344+
)
345+
yield ""
346+
return
347+
else:
348+
raise AirbyteTracedException(
349+
internal_message="Must define a `download_target_extractor` when using a `download_target_requester`.",
350+
failure_type=FailureType.config_error,
351+
)
352+
353+
# We have a download_target_extractor, use it to extract the donload_target
354+
if self.download_target_requester:
355+
# if a download_target_requester if defined, we extract from the response of a request specifically for download targets.
336356
stream_slice: StreamSlice = StreamSlice(
337357
partition={},
338358
cursor_slice={},
@@ -346,5 +366,8 @@ def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
346366
internal_message="Always expect a response or an exception from download_target_requester",
347367
failure_type=FailureType.system_error,
348368
)
369+
else:
370+
# if no download_target_requester is defined, we extract from the polling response
371+
url_response = self._polling_job_response_by_id[job.api_job_id()]
349372

350373
yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings

bin/generate-component-manifest-dagger.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@
77

88
set -e
99

10-
pip install dagger-io==0.13.3
1110
python bin/generate_component_manifest_files.py

debug_manifest/README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,19 @@ To configure the debugger in VSCode to run the `debug_manifest`, follow these st
2222
"request": "launch",
2323
"console": "integratedTerminal",
2424
"cwd": "${workspaceFolder}/debug_manifest",
25-
"python": "<PATH_TO_CDK_ENV>/bin/python",
25+
"python": "<PATH_TO_CDK_ENV>/bin/python", // REPLACE ME
2626
"module": "debug_manifest",
2727
"args": [
2828
// SPECIFY THE COMMAND: [spec, check, discover, read]
2929
"read",
30+
// SPECIFY THE MANIFEST FILE
31+
"--manifest-path",
32+
// PATH TO THE MANIFEST FILE
33+
"resources/manifest.yaml",
34+
// SPECIFY A COMPONENTS.PY FILE (OPTIONAL)
35+
"--components-path",
36+
// PATH TO THE COMPONENTS FILE
37+
"resources/components.py",
3038
// SPECIFY THE CONFIG
3139
"--config",
3240
// PATH TO THE CONFIG FILE

debug_manifest/debug_manifest.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,12 @@
33
#
44

55
import sys
6-
from typing import Any, Mapping
76

87
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
98
from airbyte_cdk.sources.declarative.yaml_declarative_source import (
109
YamlDeclarativeSource,
1110
)
1211

13-
configuration: Mapping[str, Any] = {
14-
"path_to_yaml": "resources/manifest.yaml",
15-
}
16-
1712

1813
def debug_manifest(source: YamlDeclarativeSource, args: list[str]) -> None:
1914
"""
@@ -22,15 +17,56 @@ def debug_manifest(source: YamlDeclarativeSource, args: list[str]) -> None:
2217
launch(source, args)
2318

2419

20+
def _register_components_from_file(filepath: str) -> None:
21+
"""
22+
Dynamically load a Python file containing custom component definitions and register it
23+
under specific module names in sys.modules to ensure that these classes can be properly
24+
resolved during hydration of the manifest yaml file.
25+
26+
This is a somewhat hacky replacement for the file structure manipulation we do when building
27+
connector images to ensure the custom components can be imported.
28+
"""
29+
import importlib.util
30+
import sys
31+
from pathlib import Path
32+
33+
components_path = Path(filepath)
34+
if not components_path.exists():
35+
raise FileNotFoundError(f"Components file not found: {components_path}")
36+
37+
module_name = "components"
38+
sdm_module_name = "source_declarative_manifest.components"
39+
40+
spec = importlib.util.spec_from_file_location(module_name, components_path)
41+
if spec is None or spec.loader is None:
42+
raise ImportError(f"Could not load module from {components_path}")
43+
44+
# Create module and execute code
45+
module = importlib.util.module_from_spec(spec)
46+
47+
# Register then execute the module
48+
# we dual-register the module to mirror what is done elsewhere in the CDK
49+
sys.modules[module_name] = module
50+
sys.modules[sdm_module_name] = module
51+
52+
spec.loader.exec_module(module)
53+
54+
2555
if __name__ == "__main__":
2656
args = sys.argv[1:]
57+
parsed_args = AirbyteEntrypoint.parse_args(args)
58+
59+
manifest_path = getattr(parsed_args, "manifest_path", None) or "resources/manifest.yaml"
60+
components_path = getattr(parsed_args, "components_path", None)
61+
if components_path:
62+
_register_components_from_file(components_path)
2763
catalog_path = AirbyteEntrypoint.extract_catalog(args)
2864
config_path = AirbyteEntrypoint.extract_config(args)
2965
state_path = AirbyteEntrypoint.extract_state(args)
3066

3167
debug_manifest(
3268
YamlDeclarativeSource(
33-
path_to_yaml="resources/manifest.yaml",
69+
path_to_yaml=manifest_path,
3470
catalog=YamlDeclarativeSource.read_catalog(catalog_path) if catalog_path else None,
3571
config=YamlDeclarativeSource.read_config(config_path) if config_path else None,
3672
state=YamlDeclarativeSource.read_state(state_path) if state_path else None,

0 commit comments

Comments
 (0)