@@ -1474,6 +1474,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
1474
1474
stream_namespace : Optional [str ],
1475
1475
config : Config ,
1476
1476
message_repository : Optional [MessageRepository ] = None ,
1477
+ stream_state_migrations : Optional [List [Any ]] = None ,
1477
1478
** kwargs : Any ,
1478
1479
) -> ConcurrentCursor :
1479
1480
# Per-partition incremental streams can dynamically create child cursors which will pass their current
@@ -1484,6 +1485,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
1484
1485
if "stream_state" not in kwargs
1485
1486
else kwargs ["stream_state" ]
1486
1487
)
1488
+ stream_state = self .apply_stream_state_migrations (stream_state_migrations , stream_state )
1487
1489
1488
1490
component_type = component_definition .get ("type" )
1489
1491
if component_definition .get ("type" ) != model_type .__name__ :
@@ -2186,7 +2188,8 @@ def _build_concurrent_cursor(
2186
2188
)
2187
2189
2188
2190
if model .incremental_sync and stream_slicer :
2189
- # FIXME should this be in create_concurrent_cursor_from_perpartition_cursor
2191
+ # FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for
2192
+ # ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor
2190
2193
if model .state_migrations :
2191
2194
state_transformations = [
2192
2195
self ._create_component_from_model (
@@ -3327,7 +3330,7 @@ def _get_url() -> str:
3327
3330
),
3328
3331
)
3329
3332
3330
- cursor_used_for_stop_condition = cursor if stop_condition_cursor else None
3333
+ cursor_used_for_stop_condition = stop_condition_cursor or None
3331
3334
paginator = (
3332
3335
self ._create_component_from_model (
3333
3336
model = model .paginator ,
0 commit comments