@@ -934,6 +934,17 @@ def create_concurrency_level(
934
934
parameters = {},
935
935
)
936
936
937
+ @staticmethod
938
+ def apply_stream_state_migrations (
939
+ stream_state_migrations : List [Any ], stream_state : MutableMapping [str , Any ]
940
+ ) -> MutableMapping [str , Any ]:
941
+ if stream_state_migrations :
942
+ for state_migration in stream_state_migrations :
943
+ if state_migration .should_migrate (stream_state ):
944
+ # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
945
+ stream_state = dict (state_migration .migrate (stream_state ))
946
+ return stream_state
947
+
937
948
def create_concurrent_cursor_from_datetime_based_cursor (
938
949
self ,
939
950
model_type : Type [BaseModel ],
@@ -954,11 +965,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
954
965
if "stream_state" not in kwargs
955
966
else kwargs ["stream_state" ]
956
967
)
957
- if stream_state_migrations :
958
- for state_migration in stream_state_migrations :
959
- if state_migration .should_migrate (stream_state ):
960
- # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
961
- stream_state = dict (state_migration .migrate (stream_state ))
968
+ stream_state = self .apply_stream_state_migrations (stream_state_migrations , stream_state )
962
969
963
970
component_type = component_definition .get ("type" )
964
971
if component_definition .get ("type" ) != model_type .__name__ :
@@ -1246,12 +1253,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
1246
1253
stream_state_migrations = stream_state_migrations ,
1247
1254
)
1248
1255
)
1249
-
1250
- if stream_state_migrations :
1251
- for state_migration in stream_state_migrations :
1252
- if state_migration .should_migrate (stream_state ):
1253
- # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
1254
- stream_state = dict (state_migration .migrate (stream_state ))
1256
+ stream_state = self .apply_stream_state_migrations (stream_state_migrations , stream_state )
1255
1257
1256
1258
# Return the concurrent cursor and state converter
1257
1259
return ConcurrentPerPartitionCursor (
@@ -1711,15 +1713,13 @@ def _merge_stream_slicers(
1711
1713
config = config or {},
1712
1714
stream_state = {},
1713
1715
partition_router = stream_slicer ,
1714
- stream_state_migrations = model .state_migrations ,
1715
1716
)
1716
1717
return self .create_concurrent_cursor_from_datetime_based_cursor ( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
1717
1718
model_type = DatetimeBasedCursorModel ,
1718
1719
component_definition = model .incremental_sync .__dict__ ,
1719
1720
stream_name = model .name or "" ,
1720
1721
stream_namespace = None ,
1721
1722
config = config or {},
1722
- stream_state_migrations = model .state_migrations ,
1723
1723
)
1724
1724
1725
1725
incremental_sync_model = model .incremental_sync
0 commit comments