Skip to content

Commit 35e0e68

Browse files
committed
file-based: add identities stream and rename acl toggle
1 parent edd6f69 commit 35e0e68

File tree

10 files changed

+274
-45
lines changed

10 files changed

+274
-45
lines changed

airbyte_cdk/sources/file_based/config/abstract_file_based_spec.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111

1212
from airbyte_cdk import OneOfOptionConfig
1313
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
14+
from airbyte_cdk.sources.file_based.config.identities_based_stream_config import (
15+
IdentitiesStreamConfig,
16+
)
1417
from airbyte_cdk.sources.utils import schema_helpers
1518

1619

@@ -22,12 +25,17 @@ class Config(OneOfOptionConfig):
2225

2326
delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)
2427

25-
sync_metadata: bool = Field(
26-
title="Make stream sync files metadata",
27-
description="If enabled, streams will sync files metadata instead of files data.",
28+
sync_acl_permissions: bool = Field(
29+
title="Include ACL Permissions",
30+
description="Joins Document allowlists to each stream.",
2831
default=False,
2932
airbyte_hidden=True,
3033
)
34+
identities: Optional[IdentitiesStreamConfig] = Field(
35+
title="Identities configuration",
36+
description="Configuration for identities",
37+
airbyte_hidden=True,
38+
)
3139

3240

3341
class DeliverRawFiles(BaseModel):
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from pydantic.v1 import BaseModel, Field
2+
from typing import Literal
3+
4+
5+
class IdentitiesStreamConfig(BaseModel):
6+
name: Literal["identities"] = Field("identities", const=True, airbyte_hidden=True)
7+
domain: str = Field(title="Domain", description="The domain of the identities.")
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import uuid
6+
from datetime import datetime
7+
from enum import Enum
8+
from pydantic.v1 import BaseModel
9+
10+
11+
class RemoteFileIdentityType(Enum):
12+
USER = "user"
13+
GROUP = "group"
14+
15+
16+
class RemoteFileIdentity(BaseModel):
17+
id: uuid.UUID
18+
remote_id: str
19+
parent_id: str | None = None
20+
name: str | None = None
21+
description: str | None = None
22+
email_address: str | None = None
23+
member_email_addresses: list[str] | None = None
24+
type: RemoteFileIdentityType
25+
modified_at: datetime
26+
27+
28+
class RemoteFilePermissions(BaseModel):
29+
id: str
30+
file_path: str
31+
allowed_identity_remote_ids: list[str] | None = None
32+
denied_identity_remote_ids: list[str] | None = None
33+
publicly_accessible: bool = False

airbyte_cdk/sources/file_based/file_based_source.py

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
FileBasedStreamConfig,
3434
ValidationPolicy,
3535
)
36+
from airbyte_cdk.sources.file_based.config.identities_based_stream_config import (
37+
IdentitiesStreamConfig,
38+
)
3639
from airbyte_cdk.sources.file_based.discovery_policy import (
3740
AbstractDiscoveryPolicy,
3841
DefaultDiscoveryPolicy,
@@ -49,7 +52,11 @@
4952
DEFAULT_SCHEMA_VALIDATION_POLICIES,
5053
AbstractSchemaValidationPolicy,
5154
)
52-
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream, DefaultFileBasedStream
55+
from airbyte_cdk.sources.file_based.stream import (
56+
AbstractFileBasedStream,
57+
DefaultFileBasedStream,
58+
IdentitiesStream,
59+
)
5360
from airbyte_cdk.sources.file_based.stream.concurrent.adapters import FileBasedStreamFacade
5461
from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
5562
AbstractConcurrentFileBasedCursor,
@@ -157,13 +164,17 @@ def check_connection(
157164
errors = []
158165
tracebacks = []
159166
for stream in streams:
167+
if isinstance(stream, IdentitiesStream):
168+
# Probably need to check identities endpoint/api access but will skip for now.
169+
continue
160170
if not isinstance(stream, AbstractFileBasedStream):
161171
raise ValueError(f"Stream {stream} is not a file-based stream.")
162172
try:
163173
parsed_config = self._get_parsed_config(config)
164174
availability_method = (
165175
stream.availability_strategy.check_availability
166-
if self._use_file_transfer(parsed_config) or self._sync_metadata(parsed_config)
176+
if self._use_file_transfer(parsed_config)
177+
or self._sync_acl_permissions(parsed_config)
167178
else stream.availability_strategy.check_availability_and_parsability
168179
)
169180
(
@@ -289,6 +300,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
289300
)
290301

291302
streams.append(stream)
303+
304+
if self._add_identities_stream(parsed_config):
305+
identities_stream = self._make_identities_stream(
306+
stream_config=parsed_config.delivery_method.identities
307+
)
308+
streams.append(identities_stream)
292309
return streams
293310

294311
except ValidationError as exc:
@@ -312,7 +329,19 @@ def _make_default_stream(
312329
cursor=cursor,
313330
use_file_transfer=self._use_file_transfer(parsed_config),
314331
preserve_directory_structure=self._preserve_directory_structure(parsed_config),
315-
sync_metadata=self._sync_metadata(parsed_config),
332+
sync_acl_permissions=self._sync_acl_permissions(parsed_config),
333+
)
334+
335+
def _make_identities_stream(
336+
self,
337+
stream_config: IdentitiesStreamConfig,
338+
) -> Stream:
339+
return IdentitiesStream(
340+
config=stream_config,
341+
catalog_schema=self.stream_schemas.get(stream_config.name),
342+
stream_reader=self.stream_reader,
343+
discovery_policy=self.discovery_policy,
344+
errors_collector=self.errors_collector,
316345
)
317346

318347
def _get_stream_from_catalog(
@@ -419,11 +448,19 @@ def _preserve_directory_structure(parsed_config: AbstractFileBasedSpec) -> bool:
419448
return True
420449

421450
@staticmethod
422-
def _sync_metadata(parsed_config: AbstractFileBasedSpec) -> bool:
451+
def _sync_acl_permissions(parsed_config: AbstractFileBasedSpec) -> bool:
423452
if (
424453
FileBasedSource._use_records_transfer(parsed_config)
425-
and hasattr(parsed_config.delivery_method, "sync_metadata")
426-
and parsed_config.delivery_method.sync_metadata is not None
454+
and hasattr(parsed_config.delivery_method, "sync_acl_permissions")
455+
and parsed_config.delivery_method.sync_acl_permissions is not None
427456
):
428-
return parsed_config.delivery_method.sync_metadata
457+
return parsed_config.delivery_method.sync_acl_permissions
429458
return False
459+
460+
@staticmethod
461+
def _add_identities_stream(parsed_config: AbstractFileBasedSpec) -> bool:
462+
return (
463+
FileBasedSource._sync_acl_permissions(parsed_config)
464+
and parsed_config.delivery_method.identities is not None
465+
and parsed_config.delivery_method.identities.domain
466+
)

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,14 @@ def preserve_directory_structure(self) -> bool:
155155
return self.config.delivery_method.preserve_directory_structure
156156
return True
157157

158-
def sync_metadata(self) -> bool:
158+
def sync_acl_permissions(self) -> bool:
159159
if (
160160
self.config
161161
and self.use_records_transfer()
162-
and hasattr(self.config.delivery_method, "sync_metadata")
163-
and self.config.delivery_method.sync_metadata is not None
162+
and hasattr(self.config.delivery_method, "sync_acl_permissions")
163+
and self.config.delivery_method.sync_acl_permissions is not None
164164
):
165-
return self.config.delivery_method.sync_metadata
165+
return self.config.delivery_method.sync_acl_permissions
166166
return False
167167

168168
@abstractmethod
@@ -203,25 +203,16 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li
203203
absolute_file_path = path.abspath(local_file_path)
204204
return [file_relative_path, local_file_path, absolute_file_path]
205205

206-
def get_file_metadata(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
206+
def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]:
207207
"""
208208
This is required for connectors that will support syncing
209-
metadata from files.
209+
ACL Permissions from files.
210210
"""
211211
return {}
212212

213-
def get_metadata_schema(self) -> Dict[str, Any]:
214-
""" "
215-
Base schema to emit metadata records for a file,
216-
override in stream reader implementation if the requirements
217-
are different.
218-
"""
219-
return {
220-
"type": "object",
221-
"properties": {
222-
"id": {"type": "string"},
223-
"file_path": {"type": "string"},
224-
"allowed_identity_remote_ids": {"type": "array", "items": "string"},
225-
"is_public": {"type": "boolean"},
226-
},
227-
}
213+
def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
214+
"""
215+
This is required for connectors that will support syncing
216+
identities.
217+
"""
218+
yield {}

airbyte_cdk/sources/file_based/schema_helpers.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,31 @@
2323
"properties": {"data": {"type": "object"}, "file": {"type": "object"}},
2424
}
2525

26+
remote_file_permissions_schema = {
27+
"type": "object",
28+
"properties": {
29+
"id": {"type": "string"},
30+
"file_path": {"type": "string"},
31+
"allowed_identity_remote_ids": {"type": "array", "items": "string"},
32+
"publicly_accessible": {"type": "boolean"},
33+
},
34+
}
35+
36+
remote_file_identity_schema = {
37+
"type": "object",
38+
"properties": {
39+
"id": {"type": "string"},
40+
"remote_id": {"type": "string"},
41+
"parent_id": {"type": ["null", "string"]},
42+
"name": {"type": ["null", "string"]},
43+
"description": {"type": ["null", "string"]},
44+
"email_address": {"type": ["null", "string"]},
45+
"member_email_addresses": {"type": ["null", "array"]},
46+
"type": {"type": "string"},
47+
"modified_at": {"type": "string"},
48+
},
49+
}
50+
2651

2752
@total_ordering
2853
class ComparableType(Enum):
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from airbyte_cdk.sources.file_based.stream.abstract_file_based_stream import AbstractFileBasedStream
22
from airbyte_cdk.sources.file_based.stream.default_file_based_stream import DefaultFileBasedStream
3+
from airbyte_cdk.sources.file_based.stream.identities_stream import IdentitiesStream
34

4-
__all__ = ["AbstractFileBasedStream", "DefaultFileBasedStream"]
5+
__all__ = ["AbstractFileBasedStream", "DefaultFileBasedStream", "IdentitiesStream"]

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
file_transfer_schema,
3131
merge_schemas,
3232
schemaless_schema,
33+
remote_file_permissions_schema,
3334
)
3435
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
3536
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
@@ -47,7 +48,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
4748

4849
FILE_TRANSFER_KW = "use_file_transfer"
4950
PRESERVE_DIRECTORY_STRUCTURE_KW = "preserve_directory_structure"
50-
SYNC_METADATA_KW = "sync_metadata"
51+
SYNC_ACL_PERMISSIONS_KW = "sync_acl_permissions"
5152
FILES_KEY = "files"
5253
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
5354
ab_last_mod_col = "_ab_source_file_last_modified"
@@ -57,7 +58,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
5758
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
5859
use_file_transfer = False
5960
preserve_directory_structure = True
60-
sync_metadata = False
61+
sync_acl_permissions = False
6162

6263
def __init__(self, **kwargs: Any):
6364
if self.FILE_TRANSFER_KW in kwargs:
@@ -66,8 +67,8 @@ def __init__(self, **kwargs: Any):
6667
self.preserve_directory_structure = kwargs.pop(
6768
self.PRESERVE_DIRECTORY_STRUCTURE_KW, True
6869
)
69-
if self.SYNC_METADATA_KW in kwargs:
70-
self.sync_metadata = kwargs.pop(self.SYNC_METADATA_KW, False)
70+
if self.SYNC_ACL_PERMISSIONS_KW in kwargs:
71+
self.sync_acl_permissions = kwargs.pop(self.SYNC_ACL_PERMISSIONS_KW, False)
7172
super().__init__(**kwargs)
7273

7374
@property
@@ -109,8 +110,8 @@ def _filter_schema_invalid_properties(
109110
self.ab_file_name_col: {"type": "string"},
110111
},
111112
}
112-
elif self.sync_metadata:
113-
return self.stream_reader.get_metadata_schema()
113+
elif self.sync_acl_permissions:
114+
return remote_file_permissions_schema
114115
else:
115116
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)
116117

@@ -193,9 +194,9 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
193194
yield stream_data_to_airbyte_message(
194195
self.name, record, is_file_transfer_message=True
195196
)
196-
elif self.sync_metadata:
197+
elif self.sync_acl_permissions:
197198
try:
198-
metadata_record = self.stream_reader.get_file_metadata(
199+
metadata_record = self.stream_reader.get_file_acl_permissions(
199200
file, logger=self.logger
200201
)
201202
yield stream_data_to_airbyte_message(
@@ -310,8 +311,8 @@ def get_json_schema(self) -> JsonSchema:
310311
def _get_raw_json_schema(self) -> JsonSchema:
311312
if self.use_file_transfer:
312313
return file_transfer_schema
313-
elif self.sync_metadata:
314-
return self.stream_reader.get_metadata_schema()
314+
elif self.sync_acl_permissions:
315+
return remote_file_permissions_schema
315316
elif self.config.input_schema:
316317
return self.config.get_input_schema() # type: ignore
317318
elif self.config.schemaless:

0 commit comments

Comments
 (0)