Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
065e737
fix(sql): Add fallback to source_defined_primary_key in CatalogProvider
devin-ai-integration[bot] Jun 27, 2025
821848b
refactor(sql): Simplify primary key fallback logic with one-liner
devin-ai-integration[bot] Jun 28, 2025
565aa64
refactor(test): Parametrize catalog provider tests to reduce duplication
devin-ai-integration[bot] Jun 28, 2025
07de856
docs(sql): Expand docstring for get_primary_keys to explain fallback …
devin-ai-integration[bot] Jun 28, 2025
b4aa7df
fix(format): Apply ruff formatting to docstring changes
devin-ai-integration[bot] Jun 28, 2025
65e8e87
feat(sql): Prioritize source_defined_primary_key and return None when…
devin-ai-integration[bot] Jun 28, 2025
be8d806
feat(sql): Add guard statements for primary key validation in merge o…
devin-ai-integration[bot] Jun 29, 2025
8d1ecae
fix(format): Break long docstring line to meet line length requirements
devin-ai-integration[bot] Jun 29, 2025
625cd1e
fix(cherry-pick me): improve messaging for 'could not import module' …
aaronsteers Jul 3, 2025
74240ab
docs(sql): Clarify that get_primary_keys returns column names, not va…
devin-ai-integration[bot] Jul 3, 2025
820d9e8
tests: improve fast test outputs, skip 'read' tests for destinations
aaronsteers Jul 3, 2025
efd9d7d
Apply suggestions from code review
aaronsteers Jul 3, 2025
aefed0d
fix missing import
aaronsteers Jul 3, 2025
0839137
apply suggestion: rename to 'raise_if_errors'
aaronsteers Jul 3, 2025
3d14724
renames (round two)
aaronsteers Jul 3, 2025
623723f
fix missing declaration
aaronsteers Jul 3, 2025
4a23c35
fix lint issue
aaronsteers Jul 4, 2025
6966f6d
fix verify, improve message output
aaronsteers Jul 4, 2025
51e2ad7
more verbose format output
aaronsteers Jul 4, 2025
7bf86ab
Merge branch 'devin/1751064114-fix-primary-key-fallback' of https://g…
aaronsteers Jul 4, 2025
10f39d2
remove extra space
aaronsteers Jul 4, 2025
fdf2273
Merge branch 'devin/1751064114-fix-primary-key-fallback' into aj/test…
aaronsteers Jul 4, 2025
66df838
always test destination-motherduck
aaronsteers Jul 4, 2025
eb0af5d
Merge branch 'main' into aj/tests/improve-fast-test-outputs
aaronsteers Jul 4, 2025
ff9e842
use proper destination test suite class
aaronsteers Jul 4, 2025
b0f28b7
remove unused import
aaronsteers Jul 4, 2025
0d8c317
fix language tags resolution
aaronsteers Jul 4, 2025
0dd3cd9
use absolute path to detect name
aaronsteers Jul 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ jobs:
- connector: source-google-drive
cdk_extra: file-based
- connector: destination-motherduck
cdk_extra: sql
# For now, we mark as 'n/a' to always test this connector
cdk_extra: n/a # change to 'sql' to test less often
# source-amplitude failing for unrelated issue "date too far back"
# e.g. https://github.com/airbytehq/airbyte-python-cdk/actions/runs/16053716569/job/45302638848?pr=639
# - connector: source-amplitude
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/python_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Install dependencies
run: poetry install --all-extras

# Job-specifc step(s):
# Job-specific step(s):
- name: Run lint check
run: poetry run ruff check .

Expand All @@ -49,9 +49,9 @@ jobs:
- name: Install dependencies
run: poetry install --all-extras

# Job-specifc step(s):
# Job-specific step(s):
- name: Check code format
run: poetry run ruff format --check .
run: poetry run ruff format --diff .

mypy-check:
name: MyPy Check
Expand Down
12 changes: 7 additions & 5 deletions airbyte_cdk/cli/airbyte_cdk/_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@

TEST_FILE_TEMPLATE = '''
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""FAST Airbyte Standard Tests for the {connector_name} source."""
"""FAST Airbyte Standard Tests for the {connector_name} connector."""

#from airbyte_cdk.test.standard_tests import {base_class_name}
from airbyte_cdk.test.standard_tests.util import create_connector_test_suite
Expand All @@ -81,11 +81,13 @@
connector_directory=Path(),
)

# Uncomment the following lines to create a custom test suite class:
#
# class TestSuite({base_class_name}):
# """Test suite for the {connector_name} source.

# This class inherits from SourceTestSuiteBase and implements all of the tests in the suite.

# """Test suite for the `{connector_name}` connector.
#
# This class inherits from `{base_class_name}` and implements all of the tests in the suite.
#
# As long as the class name starts with "Test", pytest will automatically discover and run the
# tests in this class.
# """
Expand Down
51 changes: 51 additions & 0 deletions airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import traceback
from collections import deque
from collections.abc import Generator, Mapping
from dataclasses import dataclass
from io import StringIO
from pathlib import Path
from typing import Any, List, Literal, Optional, Union, final, overload
Expand Down Expand Up @@ -50,6 +51,21 @@
from airbyte_cdk.test.models.scenario import ExpectedOutcome


@dataclass
class AirbyteEntrypointException(Exception):
"""Exception raised for errors in the AirbyteEntrypoint execution.

Used to provide details of an Airbyte connector execution failure in the output
captured in an `EntrypointOutput` object. Use `EntrypointOutput.as_exception()` to
convert it to an exception.

Example Usage:
output = EntrypointOutput(...)
if output.errors:
raise output.as_exception()
"""


class EntrypointOutput:
"""A class to encapsulate the output of an Airbyte connector's execution.

Expand All @@ -67,13 +83,15 @@ def __init__(
messages: list[str] | None = None,
uncaught_exception: Optional[BaseException] = None,
*,
command: list[str] | None = None,
message_file: Path | None = None,
) -> None:
if messages is None and message_file is None:
raise ValueError("Either messages or message_file must be provided")
if messages is not None and message_file is not None:
raise ValueError("Only one of messages or message_file can be provided")

self._command = command
self._messages: list[AirbyteMessage] | None = None
self._message_file: Path | None = message_file
if messages:
Expand Down Expand Up @@ -182,6 +200,39 @@ def analytics_messages(self) -> List[AirbyteMessage]:
def errors(self) -> List[AirbyteMessage]:
return self._get_trace_message_by_trace_type(TraceType.ERROR)

def get_formatted_error_message(self) -> str:
"""Returns a human-readable error message with the contents.

If there are no errors, returns an empty string.
"""
errors = self.errors
if not errors:
# If there are no errors, return an empty string.
return ""

result = "Failed to run airbyte command"
result += ": " + " ".join(self._command) if self._command else "."
result += "\n" + "\n".join(
[str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace],
)
return result

def as_exception(self) -> AirbyteEntrypointException:
"""Convert the output to an exception."""
return AirbyteEntrypointException(self.get_formatted_error_message())

def raise_if_errors(
self,
) -> None:
"""Raise an exception if there are errors in the output.

Otherwise, do nothing.
"""
if not self.errors:
return None

raise self.as_exception()

@property
def catalog(self) -> AirbyteMessage:
catalog = self.get_message_by_types([Type.CATALOG])
Expand Down
29 changes: 4 additions & 25 deletions airbyte_cdk/test/standard_tests/_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,6 @@
)


def _errors_to_str(
entrypoint_output: entrypoint_wrapper.EntrypointOutput,
) -> str:
"""Convert errors from entrypoint output to a string."""
if not entrypoint_output.errors:
# If there are no errors, return an empty string.
return ""

return "\n" + "\n".join(
[
str(error.trace.error).replace(
"\\n",
"\n",
)
for error in entrypoint_output.errors
if error.trace
],
)


@runtime_checkable
class IConnector(Protocol):
"""A connector that can be run in a test scenario.
Expand Down Expand Up @@ -125,9 +105,7 @@ def run_test_job(
expected_outcome=test_scenario.expected_outcome,
)
if result.errors and test_scenario.expected_outcome.expect_success():
raise AssertionError(
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)
raise result.as_exception()

if verb == "check":
# Check is expected to fail gracefully without an exception.
Expand All @@ -137,7 +115,7 @@ def run_test_job(
"Expected exactly one CONNECTION_STATUS message. Got "
f"{len(result.connection_status_messages)}:\n"
+ "\n".join([str(msg) for msg in result.connection_status_messages])
+ _errors_to_str(result)
+ result.get_formatted_error_message()
)
if test_scenario.expected_outcome.expect_exception():
conn_status = result.connection_status_messages[0].connectionStatus
Expand All @@ -161,7 +139,8 @@ def run_test_job(

if test_scenario.expected_outcome.expect_success():
assert not result.errors, (
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
f"Test job failed with {len(result.errors)} error(s): \n"
+ result.get_formatted_error_message()
)

return result
2 changes: 1 addition & 1 deletion airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None:
specific connector class to be tested.
"""
connector_root = cls.get_connector_root_dir()
connector_name = connector_root.absolute().name
connector_name = cls.connector_name

expected_module_name = connector_name.replace("-", "_").lower()
expected_class_name = connector_name.replace("-", "_").title().replace("_", "")
Expand Down
97 changes: 40 additions & 57 deletions airbyte_cdk/test/standard_tests/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dataclasses import asdict
from pathlib import Path
from subprocess import CompletedProcess, SubprocessError
from typing import Literal
from typing import Literal, cast

import orjson
import pytest
Expand All @@ -25,19 +25,18 @@
DestinationSyncMode,
SyncMode,
)
from airbyte_cdk.models.airbyte_protocol_serializers import (
AirbyteCatalogSerializer,
AirbyteStreamSerializer,
)
from airbyte_cdk.models.connector_metadata import MetadataFile
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.models import ConnectorTestScenario
from airbyte_cdk.test.utils.reading import catalog
from airbyte_cdk.utils.connector_paths import (
ACCEPTANCE_TEST_CONFIG,
find_connector_root,
)
from airbyte_cdk.utils.docker import build_connector_image, run_docker_command
from airbyte_cdk.utils.docker import (
build_connector_image,
run_docker_airbyte_command,
run_docker_command,
)


class DockerConnectorTestSuite:
Expand All @@ -55,6 +54,17 @@ def get_connector_root_dir(cls) -> Path:
"""Get the root directory of the connector."""
return find_connector_root([cls.get_test_class_dir(), Path.cwd()])

@classproperty
def connector_name(self) -> str:
"""Get the name of the connector."""
connector_root = self.get_connector_root_dir()
return connector_root.absolute().name

@classmethod
def is_destination_connector(cls) -> bool:
"""Check if the connector is a destination."""
return cast(str, cls.connector_name).startswith("destination-")

@classproperty
def acceptance_test_config_path(cls) -> Path:
"""Get the path to the acceptance test config file."""
Expand Down Expand Up @@ -145,23 +155,16 @@ def test_docker_image_build_and_spec(
no_verify=False,
)

try:
result: CompletedProcess[str] = run_docker_command(
[
"docker",
"run",
"--rm",
connector_image,
"spec",
],
check=True, # Raise an error if the command fails
capture_stderr=True,
capture_stdout=True,
)
except SubprocessError as ex:
raise AssertionError(
f"Failed to run `spec` command in docker image {connector_image!r}. Error: {ex!s}"
) from None
_ = run_docker_airbyte_command(
[
"docker",
"run",
"--rm",
connector_image,
"spec",
],
raise_if_errors=True,
)

@pytest.mark.skipif(
shutil.which("docker") is None,
Expand Down Expand Up @@ -203,7 +206,7 @@ def test_docker_image_build_and_check(
with scenario.with_temp_config_file(
connector_root=connector_root,
) as temp_config_file:
_ = run_docker_command(
_ = run_docker_airbyte_command(
[
"docker",
"run",
Expand All @@ -215,9 +218,7 @@ def test_docker_image_build_and_check(
"--config",
container_config_path,
],
check=True, # Raise an error if the command fails
capture_stderr=True,
capture_stdout=True,
raise_if_errors=True,
)

@pytest.mark.skipif(
Expand All @@ -242,6 +243,9 @@ def test_docker_image_build_and_read(
the local docker image cache using `docker image prune -a` command.
- If the --connector-image arg is provided, it will be used instead of building the image.
"""
if self.is_destination_connector():
pytest.skip("Skipping read test for destination connector.")

if scenario.expected_outcome.expect_exception():
pytest.skip("Skipping (expected to fail).")

Expand Down Expand Up @@ -295,7 +299,7 @@ def test_docker_image_build_and_read(
) as temp_dir_str,
):
temp_dir = Path(temp_dir_str)
discover_result = run_docker_command(
discover_result = run_docker_airbyte_command(
[
"docker",
"run",
Expand All @@ -307,20 +311,12 @@ def test_docker_image_build_and_read(
"--config",
container_config_path,
],
check=True, # Raise an error if the command fails
capture_stderr=True,
capture_stdout=True,
raise_if_errors=True,
)
parsed_output = EntrypointOutput(messages=discover_result.stdout.splitlines())
try:
catalog_message = parsed_output.catalog # Get catalog message
assert catalog_message.catalog is not None, "Catalog message missing catalog."
discovered_catalog: AirbyteCatalog = parsed_output.catalog.catalog
except Exception as ex:
raise AssertionError(
f"Failed to load discovered catalog from {discover_result.stdout}. "
f"Error: {ex!s}"
) from None

catalog_message = discover_result.catalog # Get catalog message
assert catalog_message.catalog is not None, "Catalog message missing catalog."
discovered_catalog: AirbyteCatalog = catalog_message.catalog
if not discovered_catalog.streams:
raise ValueError(
f"Discovered catalog for connector '{connector_name}' is empty. "
Expand Down Expand Up @@ -355,7 +351,7 @@ def test_docker_image_build_and_read(
configured_catalog_path.write_text(
orjson.dumps(asdict(configured_catalog)).decode("utf-8")
)
read_result: CompletedProcess[str] = run_docker_command(
read_result: EntrypointOutput = run_docker_airbyte_command(
[
"docker",
"run",
Expand All @@ -371,18 +367,5 @@ def test_docker_image_build_and_read(
"--catalog",
container_catalog_path,
],
check=False,
capture_stderr=True,
capture_stdout=True,
raise_if_errors=True,
)
if read_result.returncode != 0:
raise AssertionError(
f"Failed to run `read` command in docker image {connector_image!r}. "
"\n-----------------"
f"EXIT CODE: {read_result.returncode}\n"
"STDERR:\n"
f"{read_result.stderr}\n"
f"STDOUT:\n"
f"{read_result.stdout}\n"
"\n-----------------"
) from None
Loading
Loading