Skip to content

Commit 722c7e0

Browse files
committed
file-based: add tests for identities stream
1 parent c94b704 commit 722c7e0

File tree

2 files changed

+95
-3
lines changed

2 files changed

+95
-3
lines changed

airbyte_cdk/sources/streams/permissions/identities.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,14 @@
1515
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
1616
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
1717

18-
DEFAULT_IDENTITIES_STREAM_NAME = "identities"
19-
2018

2119
class Identities(Stream, ABC):
2220
"""
2321
The identities stream. A full refresh stream to sync identities from a certain domain.
2422
The load_identity_groups method manage the logic to get such data.
2523
"""
2624

27-
IDENTITIES_STREAM_NAME = DEFAULT_IDENTITIES_STREAM_NAME
25+
IDENTITIES_STREAM_NAME = "identities"
2826

2927
is_resumable = False
3028

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import unittest
6+
from datetime import datetime, timezone
7+
from unittest.mock import Mock
8+
9+
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
10+
from airbyte_cdk.sources.file_based.exceptions import (
11+
FileBasedErrorsCollector,
12+
)
13+
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
14+
from airbyte_cdk.sources.file_based.stream import FileIdentities
15+
from airbyte_protocol_dataclasses.models import SyncMode
16+
17+
18+
class MockFormat:
19+
pass
20+
21+
22+
class IdentitiesFileBasedStreamTest(unittest.TestCase):
23+
_NOW = datetime(2022, 10, 22, tzinfo=timezone.utc)
24+
_A_RECORD = {
25+
"id": "923496ab-3eee-47d2-a824-b237e630082a",
26+
"remote_id": "[email protected]",
27+
"name": "user ond",
28+
"email_address": "[email protected]",
29+
"member_email_addresses": ["[email protected]", "[email protected]"],
30+
"type": "user",
31+
"modified_at": "2025-02-12T23:06:45.304942+00:00",
32+
}
33+
34+
_GROUP_RECORD = {
35+
"id": "ebf97e50-a010-4daa-b1ce-b47494e7fb10",
36+
"remote_id": "[email protected]",
37+
"name": "team_work",
38+
"email_address": "[email protected]",
39+
"member_email_addresses": ["[email protected]", "[email protected]"],
40+
"type": "group",
41+
"modified_at": "2025-02-12T23:06:45.604572+00:00",
42+
}
43+
44+
_IDENTITIES_SCHEMA = {
45+
"type": "object",
46+
"properties": {
47+
"id": {"type": "string"},
48+
"remote_id": {"type": "string"},
49+
"parent_id": {"type": ["null", "string"]},
50+
"name": {"type": ["null", "string"]},
51+
"description": {"type": ["null", "string"]},
52+
"email_address": {"type": ["null", "string"]},
53+
"member_email_addresses": {"type": ["null", "array"]},
54+
"type": {"type": "string"},
55+
"modified_at": {"type": "string"},
56+
},
57+
}
58+
59+
def setUp(self) -> None:
60+
self._catalog_schema = Mock()
61+
self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
62+
self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy)
63+
64+
self._stream_reader.identities_schema = self._IDENTITIES_SCHEMA
65+
66+
self._stream = FileIdentities(
67+
catalog_schema=self._catalog_schema,
68+
stream_reader=self._stream_reader,
69+
discovery_policy=self._discovery_policy,
70+
errors_collector=FileBasedErrorsCollector(),
71+
)
72+
73+
def test_when_read_records_then_return_records(self) -> None:
74+
self._stream_reader.load_identity_groups.return_value = [self._A_RECORD, self._GROUP_RECORD]
75+
messages = list(self._stream.read_records(SyncMode.full_refresh))
76+
assert list(map(lambda message: message.record.data, messages)) == [
77+
self._A_RECORD,
78+
self._GROUP_RECORD,
79+
]
80+
81+
def test_when_getting_schema(self):
82+
returned_schema = self._stream.get_json_schema()
83+
assert returned_schema == self._IDENTITIES_SCHEMA
84+
85+
def test_when_read_records_and_raise_exception(self) -> None:
86+
self._stream_reader.load_identity_groups.side_effect = Exception(
87+
"Identities retrieval failed"
88+
)
89+
90+
messages = list(self._stream.read_records(SyncMode.full_refresh))
91+
assert (
92+
messages[0].log.message
93+
== "Error trying to read identities: Identities retrieval failed stream=identities"
94+
)

0 commit comments

Comments
 (0)