@@ -266,6 +266,55 @@ def migrate_cmd(args):
266
266
MIGRATE_PARSER .set_defaults (func = migrate_cmd )
267
267
268
268
269
+ def _partition_table (conf , log , table , metrics ):
270
+ if table_problems := pm_tap .get_table_compatibility_problems (conf .dbcmd , table ):
271
+ log .error (f"Cannot proceed: { table } { table_problems } " )
272
+ return None
273
+
274
+ map_data = pm_tap .get_partition_map (conf .dbcmd , table )
275
+
276
+ duration = table .partition_period or conf .partition_period
277
+
278
+ log .info (f"Evaluating { table } (duration={ duration } )" )
279
+ cur_pos = partitionmanager .database_helpers .get_position_of_table (
280
+ conf .dbcmd , table , map_data
281
+ )
282
+
283
+ sql_cmds = pm_tap .get_pending_sql_reorganize_partition_commands (
284
+ database = conf .dbcmd ,
285
+ table = table ,
286
+ partition_list = map_data ["partitions" ],
287
+ current_position = cur_pos ,
288
+ allowed_lifespan = duration ,
289
+ num_empty_partitions = conf .num_empty ,
290
+ evaluation_time = conf .curtime ,
291
+ )
292
+
293
+ if not sql_cmds :
294
+ log .debug (f"{ table } has no pending SQL updates." )
295
+ return None
296
+
297
+ composite_sql_command = "\n " .join (sql_cmds )
298
+
299
+ if conf .noop :
300
+ log .info (f"{ table } planned SQL: { composite_sql_command } " )
301
+ return {"sql" : composite_sql_command , "noop" : True }
302
+
303
+ log .info (f"{ table } running SQL: { composite_sql_command } " )
304
+
305
+ time_start = datetime .now (tz = timezone .utc )
306
+ output = conf .dbcmd .run (composite_sql_command )
307
+ time_end = datetime .now (tz = timezone .utc )
308
+ metrics .add (
309
+ "alter_time_seconds" ,
310
+ table .name ,
311
+ (time_end - time_start ).total_seconds (),
312
+ )
313
+
314
+ log .info (f"{ table } results: { output } " )
315
+ return {"sql" : composite_sql_command , "output" : output }
316
+
317
+
269
318
def do_partition (conf ):
270
319
"""Produces SQL statements to manage partitions per the supplied configuration.
271
320
@@ -298,56 +347,15 @@ def do_partition(conf):
298
347
299
348
all_results = {}
300
349
for table in conf .tables :
301
- time_start = None
302
350
try :
303
- table_problems = pm_tap .get_table_compatibility_problems (conf .dbcmd , table )
304
- if table_problems :
305
- log .error (f"Cannot proceed: { table } { table_problems } " )
306
- continue
307
-
308
- map_data = pm_tap .get_partition_map (conf .dbcmd , table )
309
-
310
- duration = conf .partition_period
311
- if table .partition_period :
312
- duration = table .partition_period
313
-
314
- log .info (f"Evaluating { table } (duration={ duration } )" )
315
- cur_pos = partitionmanager .database_helpers .get_position_of_table (
316
- conf .dbcmd , table , map_data
317
- )
318
-
319
- sql_cmds = pm_tap .get_pending_sql_reorganize_partition_commands (
320
- database = conf .dbcmd ,
321
- table = table ,
322
- partition_list = map_data ["partitions" ],
323
- current_position = cur_pos ,
324
- allowed_lifespan = duration ,
325
- num_empty_partitions = conf .num_empty ,
326
- evaluation_time = conf .curtime ,
327
- )
328
-
329
- if not sql_cmds :
330
- log .debug (f"{ table } has no pending SQL updates." )
331
- continue
332
-
333
- composite_sql_command = "\n " .join (sql_cmds )
334
-
335
- if conf .noop :
336
- all_results [table .name ] = {"sql" : composite_sql_command , "noop" : True }
337
- log .info (f"{ table } planned SQL: { composite_sql_command } " )
338
- continue
339
-
340
- log .info (f"{ table } running SQL: { composite_sql_command } " )
341
- time_start = datetime .now (tz = timezone .utc )
342
- output = conf .dbcmd .run (composite_sql_command )
343
-
344
- all_results [table .name ] = {"sql" : composite_sql_command , "output" : output }
345
- log .info (f"{ table } results: { output } " )
351
+ if results := _partition_table (conf , log , table , metrics ):
352
+ all_results [table .name ] = results
346
353
347
354
except partitionmanager .types .NoEmptyPartitionsAvailableException :
348
355
log .warning (
349
- f"Unable to automatically handle { table } : No empty "
350
- "partition is available."
356
+ "Unable to automatically handle %s: No empty "
357
+ "partition is available." ,
358
+ table ,
351
359
)
352
360
except partitionmanager .types .DatabaseCommandException as e :
353
361
log .warning ("Failed to automatically handle %s: %s" , table , e )
@@ -358,14 +366,6 @@ def do_partition(conf):
358
366
log .warning ("Failed to handle %s: %s" , table , e )
359
367
metrics .add ("alter_errors" , table .name , 1 )
360
368
361
- time_end = datetime .now (tz = timezone .utc )
362
- if time_start :
363
- metrics .add (
364
- "alter_time_seconds" ,
365
- table .name ,
366
- (time_end - time_start ).total_seconds (),
367
- )
368
-
369
369
if conf .prometheus_stats_path :
370
370
do_stats (conf , metrics = metrics )
371
371
return all_results
0 commit comments