-
Notifications
You must be signed in to change notification settings - Fork 30
feat: additional interpolation contexts for download step of async job #757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: additional interpolation contexts for download step of async job #757
Conversation
… format python files after generating pydnatic models, upgrade dagger version, install dagger normally as a dev poetry dependency, allow specifying a custom manifest file path in IDE debug configuration.
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@dbgold17/provide-additional-context-to-async-job-steps#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch dbgold17/provide-additional-context-to-async-job-steps Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds interpolation contexts for creation_response
and polling_response
to the download step of async jobs, enabling connectors to use values from earlier requests when downloading job results. It also makes the download_target_extractor
optional to support simpler APIs that don't require extracting download targets.
Key changes:
- Added
creation_response
andpolling_response
interpolation contexts to download requests - Made
download_target_extractor
optional with fallback behavior for single download requests - Updated debug manifest to support configurable manifest paths via command line arguments
Reviewed Changes
Copilot reviewed 8 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
airbyte_cdk/sources/declarative/requesters/http_job_repository.py |
Added new interpolation contexts and optional download_target_extractor logic |
airbyte_cdk/sources/declarative/models/declarative_component_schema.py |
Made download_target_extractor optional in model definition |
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py |
Updated factory to handle optional download_target_extractor |
airbyte_cdk/sources/declarative/declarative_component_schema.yaml |
Updated schema to make download_target_extractor optional |
debug_manifest/debug_manifest.py |
Added support for configurable manifest path via command line |
debug_manifest/README.md |
Updated documentation for new manifest path option |
pyproject.toml |
Added dagger-io dependency and ruff-fix to build sequence |
airbyte_cdk/sources/declarative/requesters/README.md |
Updated documentation for new interpolation contexts |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/requesters/http_job_repository.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <[email protected]>
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Warning Rate limit exceeded@dbgold17 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 8 minutes and 15 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughMakes Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Connector
participant AsyncRetriever
participant AsyncHttpJobRepository as Repo
participant API
Connector->>AsyncRetriever: read()
AsyncRetriever->>Repo: create_job()
Repo->>API: POST /jobs
API-->>Repo: creation_response
Repo-->>AsyncRetriever: creation_response
loop poll until ready
AsyncRetriever->>Repo: poll_job()
Repo->>API: GET /jobs/{id}/status
API-->>Repo: polling_response
end
rect rgba(220,235,255,0.4)
note over Repo: Determine download targets (three paths)
alt extractor present, no requester
Repo->>Repo: extract targets from polling_response
else extractor + requester present
Repo->>API: Request download targets via requester
API-->>Repo: url_response
Repo->>Repo: extract targets from url_response
else neither present (fallback)
Repo->>Repo: yield single empty target ""
end
end
loop for each download_target
Repo-->>AsyncRetriever: download call with extra_fields {download_target, creation_response, polling_response}
AsyncRetriever->>API: GET/stream download using extra_fields
API-->>AsyncRetriever: records
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Would you like me to propose a short unit-test checklist covering extractor-present, requester-present, both-absent (fallback), and requester-without-extractor (error) cases? wdyt? Pre-merge checks (3 passed)✅ Passed checks (3 passed)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
… of github.com:airbytehq/airbyte-python-cdk into dbgold17/provide-additional-context-to-async-job-steps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)
360-365
: Guard against None download_target_extractor (avoids AttributeError and clarifies contract)If download_target_requester is provided but download_target_extractor is None (or when extracting directly from polling_response without requester), this will raise an AttributeError at runtime. Can we fail fast with a clear error, wdyt?
- yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings + if not self.download_target_extractor: + raise AirbyteTracedException( + internal_message=( + "download_target_extractor is required when using download_target_requester " + "or when extracting download targets from polling_response." + ), + failure_type=FailureType.system_error, + ) + yield from self.download_target_extractor.extract_records(url_response)
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
3817-3817
: Clarify optionality and cross-context usage for download_target_extractorWould you be open to tightening the description so it explicitly calls out that this extractor is optional and that, when omitted, download_requester can interpolate from creation_response/polling_response/download_target, wdyt?
- description: Responsible for fetching the information needed to download the completed job from the polling HTTP response. + description: Optional. Extracts the information (e.g., URL string) needed to download the completed job from the polling HTTP response. When omitted, the download_requester may interpolate required values from creation_response, polling_response, or download_target.Separately, since the goal is “additional interpolation contexts for the download step,” should we also allow creation_response, polling_response, and download_target in HttpRequester.request_parameters, request_headers, and request_body interpolation contexts to cover APIs that require these values in params/headers/body (not just URL/path), wdyt?
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2853-2855
: Add unit tests for AsyncRetriever.fetch_records() when download_target_extractor is None
Verifies that_get_download_targets
falls back to a single empty string download_target when no extractor is provided, preserving backward compatibility—wdyt?airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)
347-353
: Include creation_response in the download_target_requester context too?To keep parity with the download request and the PR intent, should we also pass creation_response here so connectors can derive targets from either creation or polling responses, wdyt?
stream_slice: StreamSlice = StreamSlice( partition={}, cursor_slice={}, extra_fields={ - "polling_response": self._get_polling_response_interpolation_context(job), + "creation_response": self._get_creation_response_interpolation_context(job), + "polling_response": self._get_polling_response_interpolation_context(job), }, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/README.md
(2 hunks)airbyte_cdk/sources/declarative/requesters/http_job_repository.py
(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
PR: airbytehq/airbyte-python-cdk#0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
🧬 Code graph analysis (2)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py (1)
DpathExtractor
(18-86)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
CustomRecordExtractor
(3601-3606)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
DpathExtractor
(681-694)airbyte_cdk/sources/declarative/async_job/job.py (2)
job_parameters
(46-47)api_job_id
(32-33)airbyte_cdk/sources/types.py (4)
StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)airbyte_cdk/logger.py (1)
lazy_log
(55-60)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/requesters/http_job_repository.py
[error] 333-333: ruff format --diff detected formatting changes (1 file would be reformatted) during the 'poetry run ruff format --diff .' step. Trailing whitespace found on a line in airbyte_cdk/sources/declarative/requesters/http_job_repository.py; run 'poetry run ruff format' to fix.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (Fast)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)
216-226
: Interpolation context enrichment looks goodAdding download_target, creation_response, and polling_response into extra_fields matches the PR goal and should unlock the intended templating in the download requester. Nice. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good pending unit tests/check in model to component factory/and docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)
46-46
: Make Optional field actually optional with a default.Can we default download_target_extractor to None to reflect its Optional type and avoid breaking existing instantiations, wdyt?
- download_target_extractor: Optional[DpathExtractor] + download_target_extractor: Optional[DpathExtractor] = None
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (5)
216-227
: Avoid recomputing interpolation contexts per target (micro perf).We rebuild creation/polling contexts on every iteration. Precomputing once per job reduces JSON parsing overhead. Shall we refactor, wdyt?
- for download_target in self._get_download_targets(job): - job_slice = job.job_parameters() + job_slice = job.job_parameters() + creation_ctx = self._get_creation_response_interpolation_context(job) + polling_ctx = self._get_polling_response_interpolation_context(job) + for download_target in self._get_download_targets(job): stream_slice = StreamSlice( partition=job_slice.partition, cursor_slice=job_slice.cursor_slice, extra_fields={ **job_slice.extra_fields, - "download_target": download_target, - "creation_response": self._get_creation_response_interpolation_context(job), - "polling_response": self._get_polling_response_interpolation_context(job), + "download_target": download_target, + "creation_response": creation_ctx, + "polling_response": polling_ctx, }, )
353-353
: Typo in comment.Minor nit: s/donload_target/download_target/, wdyt?
- # We have a download_target_extractor, use it to extract the donload_target + # We have a download_target_extractor, use it to extract the download_target
354-363
: Include creation_response in the requester context too?For consistency with the download request and to unlock more templates, should we also pass creation_response here, wdyt?
- stream_slice: StreamSlice = StreamSlice( + stream_slice: StreamSlice = StreamSlice( partition={}, cursor_slice={}, extra_fields={ - "polling_response": self._get_polling_response_interpolation_context(job), + "polling_response": self._get_polling_response_interpolation_context(job), + "creation_response": self._get_creation_response_interpolation_context(job), }, )
370-372
: Guard against missing polling response (nicer error).If fetch_records is called before a COMPLETED poll stores the response, this KeyError will be cryptic. Shall we raise a traced config/system error with context instead, wdyt?
- url_response = self._polling_job_response_by_id[job.api_job_id()] + job_id = job.api_job_id() + if job_id not in self._polling_job_response_by_id: + raise AirbyteTracedException( + internal_message=f"No polling response found for job_id={job_id}. Ensure the job reached COMPLETED before fetching records.", + failure_type=FailureType.system_error, + ) + url_response = self._polling_job_response_by_id[job_id]
373-374
: Validate extractor output is strings (early, clear failure).Do we want to assert the extractor returns strings to avoid subtle template failures downstream, wdyt?
- yield from self.download_target_extractor.extract_records(url_response) # type: ignore # we expect download_target_extractor to always return list of strings + targets = list(self.download_target_extractor.extract_records(url_response)) # type: ignore + if not all(isinstance(t, str) for t in targets): + raise AirbyteTracedException( + internal_message="`download_target_extractor` must yield strings (download targets). Check your extractor path.", + failure_type=FailureType.config_error, + ) + yield from targets
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (6)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
DpathExtractor
(681-694)FailureType
(741-744)airbyte_cdk/sources/declarative/async_job/job.py (2)
job_parameters
(46-47)api_job_id
(32-33)airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)
stream_slice
(88-89)airbyte_cdk/sources/types.py (4)
StreamSlice
(75-169)partition
(99-104)cursor_slice
(107-112)extra_fields
(115-117)airbyte_cdk/logger.py (1)
lazy_log
(55-60)airbyte_cdk/utils/traced_exception.py (1)
AirbyteTracedException
(25-145)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Analyze (python)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/requesters/http_job_repository.py (1)
335-352
: Fallback behavior for generator is correct; config guard looks good.Yielding "" fixes the generator-return bug and the explicit config_error when requester is set without extractor clarifies misconfiguration. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
3789-3912
: Enforce config constraint in-schema: require extractor when download_target_requester is setThe factory enforces this at runtime, but we can codify it in the schema for earlier feedback and better UX. Add a property dependency so that when
download_target_requester
is present,download_target_extractor
becomes required. Wdyt?AsyncRetriever: title: Asynchronous Retriever description: "Retrieves records by Asynchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router." type: object required: - type - record_selector - status_mapping - creation_requester - polling_requester - download_requester - status_extractor properties: type: type: string enum: [AsyncRetriever] @@ download_decoder: title: Download HTTP Response Format description: Component decoding the download response so records can be extracted. anyOf: - "$ref": "#/definitions/CsvDecoder" - "$ref": "#/definitions/GzipDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/ZipfileDecoder" - "$ref": "#/definitions/CustomDecoder" + dependencies: + download_target_requester: + - download_target_extractor
1618-1620
: Renameadditional_properties
→additionalProperties
in declarative_component_schema.yaml?draft-07 validators ignore the snake_case
additional_properties
key — this will silently drop schema intent. Found occurrences at airbyte_cdk/sources/declarative/declarative_component_schema.yaml:1619 and :1826.$parameters: type: object - additional_properties: true + additionalProperties: truewdyt?
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
1594-1600
: Tighten description and avoid AsyncRetriever coupling in FileUploaderWould you reword this to avoid implying an AsyncRetriever-only flow and to clarify singular/plural, e.g. “Responsible for extracting the final result URL(s) to download the file.” Wdyt?
- description: Responsible for fetching the final result `urls` provided by the completed / finished / ready async job. + description: Responsible for extracting the final result URL(s) to download the file.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(1 hunks)airbyte_cdk/sources/declarative/requesters/README.md
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte_cdk/sources/declarative/requesters/README.md
- airbyte_cdk/sources/declarative/models/declarative_component_schema.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-shopify
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Analyze (python)
@pnilan ready for re-review. I decided that unit tests were not necessary for this change given the scope of existing unit tests for the async workflow is pretty high-level. |
Resolves https://github.com/airbytehq/oncall/issues/8760
base branch PR (would love a review): #756
This PR adds
creation_response
andpolling_response
interpolation contexts to the download requester of theAsyncRetriever
. This enables connectors to use values returned in earlier requests to create or poll for async jobs in the request made to actually retrieve data from a completed job.The impetus for this is to support the TikTok business API for change logs (documentation)
In order to make this work, I had to also make the
download_target_extractor
optional. This is because the current logic assumes thatdownload_targets
, which are typically a list of urls to download completed jobs, are always present. If there is more than one download target, the cdk will make one download request per target, passing each target in as interpolation context. In the case, we do not want this behavior, and it risks duplicating requested data if only one request is required to fetch all of the data. Instead, we can use values from thecreation_response
orpolling_response
to make a single download request. See my notes inairbyte_cdk/sources/declarative/requesters/README.md
for a full explanation of the new logic.An alternative implementation would be to allow the
download_target
to be constructed with the inclusion ofcreation_response
orpolling_response
interpolation contexts. This may enable us to make multiple download requests based on the values of those variables but given we aren't aware of a use case for this at the moment, I decided to keep things simpler for now.Accompanying Documentation Change PR
airbytehq/airbyte#66181
Still TODO:
Summary by CodeRabbit
New Features
Improvements
Documentation