16
16
17
17
from __future__ import annotations
18
18
19
+ import argparse
19
20
import json
20
21
import pkgutil
21
22
import sys
22
23
import traceback
23
- from collections .abc import Mapping
24
+ from collections .abc import MutableMapping
24
25
from pathlib import Path
25
26
from typing import Any , cast
26
27
27
28
import orjson
29
+ import yaml
28
30
29
31
from airbyte_cdk .entrypoint import AirbyteEntrypoint , launch
30
32
from airbyte_cdk .models import (
@@ -54,7 +56,7 @@ class SourceLocalYaml(YamlDeclarativeSource):
54
56
def __init__ (
55
57
self ,
56
58
catalog : ConfiguredAirbyteCatalog | None ,
57
- config : Mapping [str , Any ] | None ,
59
+ config : MutableMapping [str , Any ] | None ,
58
60
state : TState ,
59
61
** kwargs : Any ,
60
62
) -> None :
@@ -91,7 +93,8 @@ def handle_command(args: list[str]) -> None:
91
93
92
94
def _get_local_yaml_source (args : list [str ]) -> SourceLocalYaml :
93
95
try :
94
- config , catalog , state = _parse_inputs_into_config_catalog_state (args )
96
+ parsed_args = AirbyteEntrypoint .parse_args (args )
97
+ config , catalog , state = _parse_inputs_into_config_catalog_state (parsed_args )
95
98
return SourceLocalYaml (config = config , catalog = catalog , state = state )
96
99
except Exception as error :
97
100
print (
@@ -162,21 +165,40 @@ def create_declarative_source(
162
165
connector builder.
163
166
"""
164
167
try :
165
- config : Mapping [str , Any ] | None
168
+ config : MutableMapping [str , Any ] | None
166
169
catalog : ConfiguredAirbyteCatalog | None
167
170
state : list [AirbyteStateMessage ]
168
- config , catalog , state = _parse_inputs_into_config_catalog_state (args )
169
- if config is None or "__injected_declarative_manifest" not in config :
171
+
172
+ parsed_args = AirbyteEntrypoint .parse_args (args )
173
+ config , catalog , state = _parse_inputs_into_config_catalog_state (parsed_args )
174
+
175
+ if config is None :
176
+ raise ValueError (
177
+ "Invalid config: `__injected_declarative_manifest` should be provided at the root "
178
+ "of the config or using the --manifest-path argument."
179
+ )
180
+
181
+ # If a manifest_path is provided in the args, inject it into the config
182
+ if hasattr (parsed_args , "manifest_path" ) and parsed_args .manifest_path :
183
+ injected_manifest = _parse_manifest_from_file (parsed_args .manifest_path )
184
+ if injected_manifest :
185
+ config ["__injected_declarative_manifest" ] = injected_manifest
186
+
187
+ if "__injected_declarative_manifest" not in config :
170
188
raise ValueError (
171
189
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
172
- f"of the config but config only has keys: { list (config .keys () if config else [])} "
190
+ "of the config or using the --manifest-path argument. "
191
+ f"Config only has keys: { list (config .keys () if config else [])} "
173
192
)
174
193
if not isinstance (config ["__injected_declarative_manifest" ], dict ):
175
194
raise ValueError (
176
195
"Invalid config: `__injected_declarative_manifest` should be a dictionary, "
177
196
f"but got type: { type (config ['__injected_declarative_manifest' ])} "
178
197
)
179
198
199
+ if hasattr (parsed_args , "components_path" ) and parsed_args .components_path :
200
+ _register_components_from_file (parsed_args .components_path )
201
+
180
202
return ConcurrentDeclarativeSource (
181
203
config = config ,
182
204
catalog = catalog ,
@@ -205,13 +227,12 @@ def create_declarative_source(
205
227
206
228
207
229
def _parse_inputs_into_config_catalog_state (
208
- args : list [ str ] ,
230
+ parsed_args : argparse . Namespace ,
209
231
) -> tuple [
210
- Mapping [str , Any ] | None ,
232
+ MutableMapping [str , Any ] | None ,
211
233
ConfiguredAirbyteCatalog | None ,
212
234
list [AirbyteStateMessage ],
213
235
]:
214
- parsed_args = AirbyteEntrypoint .parse_args (args )
215
236
config = (
216
237
ConcurrentDeclarativeSource .read_config (parsed_args .config )
217
238
if hasattr (parsed_args , "config" )
@@ -231,6 +252,44 @@ def _parse_inputs_into_config_catalog_state(
231
252
return config , catalog , state
232
253
233
254
255
+ def _parse_manifest_from_file (filepath : str ) -> dict [str , Any ] | None :
256
+ """Extract and parse a manifest file specified in the args."""
257
+ try :
258
+ with open (filepath , "r" , encoding = "utf-8" ) as manifest_file :
259
+ manifest_content = yaml .safe_load (manifest_file )
260
+ if manifest_content is None :
261
+ raise ValueError (f"Manifest file at { filepath } is empty" )
262
+ if not isinstance (manifest_content , dict ):
263
+ raise ValueError (f"Manifest must be a dictionary, got { type (manifest_content )} " )
264
+ return manifest_content
265
+ except Exception as error :
266
+ raise ValueError (f"Failed to load manifest file from { filepath } : { error } " )
267
+
268
+
269
+ def _register_components_from_file (filepath : str ) -> None :
270
+ """Load and register components from a Python file specified in the args."""
271
+ import importlib .util
272
+ import sys
273
+
274
+ components_path = Path (filepath )
275
+
276
+ module_name = "components"
277
+ sdm_module_name = "source_declarative_manifest.components"
278
+
279
+ # Create module spec
280
+ spec = importlib .util .spec_from_file_location (module_name , components_path )
281
+ if spec is None or spec .loader is None :
282
+ raise ImportError (f"Could not load module from { components_path } " )
283
+
284
+ # Create module and execute code, registering the module before executing its code
285
+ # To avoid issues with dataclasses that look up the module
286
+ module = importlib .util .module_from_spec (spec )
287
+ sys .modules [module_name ] = module
288
+ sys .modules [sdm_module_name ] = module
289
+
290
+ spec .loader .exec_module (module )
291
+
292
+
234
293
def run () -> None :
235
294
args : list [str ] = sys .argv [1 :]
236
295
handle_command (args )
0 commit comments