Skip to content

Commit c4ca314

Browse files
committed
Fix set_initial_state for incremental_dependency
1 parent 741a2a0 commit c4ca314

File tree

1 file changed

+4
-12
lines changed

1 file changed

+4
-12
lines changed

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -280,20 +280,12 @@ def set_initial_state(self, stream_state: StreamState) -> None:
280280

281281
parent_state = stream_state.get("parent_state", {})
282282

283-
# If `parent_state` doesn't exist and at least one parent stream has an incremental dependency,
284-
# copy the child state to parent streams with incremental dependencies.
285-
incremental_dependency = any(
286-
[parent_config.incremental_dependency for parent_config in self.parent_stream_configs]
287-
)
288-
if not parent_state and not incremental_dependency:
289-
return
290-
291-
if not parent_state and incremental_dependency:
292-
# Migrate child state to parent state format
293-
parent_state = self._migrate_child_state_to_parent_state(stream_state)
294-
295283
# Set state for each parent stream with an incremental dependency
296284
for parent_config in self.parent_stream_configs:
285+
if not parent_state.get(parent_config.stream.name, {}) and parent_config.incremental_dependency:
286+
# Migrate child state to parent state format
287+
parent_state = self._migrate_child_state_to_parent_state(stream_state)
288+
297289
if parent_config.incremental_dependency:
298290
parent_config.stream.state = parent_state.get(parent_config.stream.name, {})
299291

0 commit comments

Comments
 (0)