Skip to content

Commit 190bf80

Browse files
committed
Initial Commit: Retention-based Partition Dropping
Lacking tests, but can determine which partitions are wholly outside of the configured `retention_period` for a given table and emit commands to drop them.
1 parent ac91828 commit 190bf80

File tree

6 files changed

+228
-43
lines changed

6 files changed

+228
-43
lines changed

partitionmanager/cli.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import traceback
1111
import yaml
1212

13+
import partitionmanager.dropper
1314
import partitionmanager.migrate
1415
import partitionmanager.sql
1516
import partitionmanager.stats
@@ -121,10 +122,10 @@ def from_yaml_file(self, file):
121122
for key in data["tables"]:
122123
tab = partitionmanager.types.Table(key)
123124
tabledata = data["tables"][key]
124-
if isinstance(tabledata, dict) and "retention" in tabledata:
125-
tab.set_retention(
125+
if isinstance(tabledata, dict) and "retention_period" in tabledata:
126+
tab.set_retention_period(
126127
partitionmanager.types.timedelta_from_dict(
127-
tabledata["retention"]
128+
tabledata["retention_period"]
128129
)
129130
)
130131
if isinstance(tabledata, dict) and "partition_period" in tabledata:
@@ -289,6 +290,20 @@ def migrate_cmd(args):
289290
MIGRATE_PARSER.set_defaults(func=migrate_cmd)
290291

291292

293+
def drop_cmd(args):
294+
"""Calculates drop.
295+
296+
Helper for argparse.
297+
"""
298+
conf = config_from_args(args)
299+
300+
return do_find_drops_for_tables(conf)
301+
302+
303+
DROP_PARSER = SUBPARSERS.add_parser("drop", help="drop old partitions")
304+
DROP_PARSER.set_defaults(func=drop_cmd)
305+
306+
292307
def do_partition(conf):
293308
"""Produces SQL statements to manage partitions per the supplied configuration.
294309
@@ -463,6 +478,46 @@ def do_stats(conf, metrics=partitionmanager.stats.PrometheusMetrics()):
463478
return all_results
464479

465480

481+
def do_find_drops_for_tables(conf):
482+
all_results = dict()
483+
for table in conf.tables:
484+
log = logging.getLogger(f"do_find_drops_for_tables:{table.name}")
485+
486+
if not table.has_date_query:
487+
log.debug(f"Cannot process {table}: no date query specified")
488+
continue
489+
490+
if not table.retention_period:
491+
log.debug(f"Cannot process {table}: no retention specified")
492+
continue
493+
494+
try:
495+
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
496+
if table_problems:
497+
log.debug(f"Cannot process {table}: {table_problems}")
498+
continue
499+
500+
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
501+
current_position = partitionmanager.database_helpers.get_position_of_table(
502+
conf.dbcmd, table, map_data
503+
)
504+
505+
droppable = partitionmanager.dropper.get_droppable_partitions(
506+
conf.dbcmd,
507+
map_data["partitions"],
508+
current_position,
509+
conf.curtime,
510+
table,
511+
)
512+
513+
all_results[table.name] = droppable
514+
except Exception as e:
515+
log.warning(f"Error processing table {table.name}")
516+
raise e
517+
518+
return all_results
519+
520+
466521
def main():
467522
"""Start here."""
468523
args = PARSER.parse_args()

partitionmanager/database_helpers.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""
2+
Helper functions for database operations
3+
"""
4+
5+
from datetime import datetime, timezone
6+
import logging
7+
8+
import partitionmanager.table_append_partition as pm_tap
9+
import partitionmanager.types
10+
11+
12+
def get_position_of_table(database, table, map_data):
13+
""" Returns a Position of the table at the current moment. """
14+
15+
pos_list = pm_tap.get_current_positions(database, table, map_data["range_cols"])
16+
17+
cur_pos = partitionmanager.types.Position()
18+
cur_pos.set_position([pos_list[col] for col in map_data["range_cols"]])
19+
20+
return cur_pos
21+
22+
23+
def calculate_exact_timestamp_via_query(database, table, position_partition):
24+
""" Calculates the exact timestamp of a PositionPartition.
25+
26+
raises ValueError if the position is incalculable
27+
"""
28+
29+
log = logging.getLogger(f"calculate_exact_timestamp_via_query:{table.name}")
30+
31+
if not table.has_date_query:
32+
raise ValueError("Table has no defined date query")
33+
34+
if not isinstance(position_partition, partitionmanager.types.PositionPartition):
35+
raise ValueError("Only PositionPartitions are supported")
36+
37+
if len(position_partition.position) != 1:
38+
raise ValueError(
39+
"This method is only valid for single-column partitions right now"
40+
)
41+
arg = position_partition.position.as_sql_input()[0]
42+
43+
sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument(arg)
44+
log.debug(
45+
"Executing %s to derive partition %s at position %s",
46+
sql_select_cmd,
47+
position_partition.name,
48+
position_partition.position,
49+
)
50+
51+
start = datetime.now()
52+
exact_time_result = database.run(sql_select_cmd)
53+
end = datetime.now()
54+
55+
assert len(exact_time_result) == 1
56+
assert len(exact_time_result[0]) == 1
57+
for key, value in exact_time_result[0].items():
58+
exact_time = datetime.fromtimestamp(value, tz=timezone.utc)
59+
break
60+
61+
log.debug(
62+
"Exact time of %s returned for %s at position %s, query took %s",
63+
exact_time,
64+
position_partition.name,
65+
position_partition.position,
66+
(end - start),
67+
)
68+
return exact_time

partitionmanager/dropper.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
Determine which partitions can be dropped.
3+
"""
4+
5+
import logging
6+
7+
import partitionmanager.types
8+
import partitionmanager.tools
9+
10+
11+
def _drop_statement(table, partition_list):
12+
""" Generate an ALTER TABLE statement to drop these partitions. """
13+
14+
log = logging.getLogger("get_droppable_partitions")
15+
16+
partitions = ",".join(map(lambda x: f"`{x.name}`", partition_list))
17+
18+
alter_cmd = (
19+
f"ALTER TABLE `{table.name}` " f"DROP PARTITION IF EXISTS {partitions} ;"
20+
)
21+
22+
log.debug("Yielding %s", alter_cmd)
23+
24+
return alter_cmd
25+
26+
27+
def get_droppable_partitions(
28+
database, partitions, current_position, current_timestamp, table
29+
):
30+
"""Return a dictionary of partitions which can be dropped and why."""
31+
log = logging.getLogger("get_droppable_partitions")
32+
results = {}
33+
droppable = []
34+
35+
if not table.retention_period:
36+
raise ValueError(f"{table.name} does not have a retention period set")
37+
38+
if not partitions:
39+
return results
40+
41+
for partition, next_partition in partitionmanager.tools.pairwise(partitions):
42+
if next_partition >= current_position:
43+
log.debug(
44+
"Stopping at %s because current position %s indicates "
45+
"subsequent partition is empty",
46+
partition,
47+
current_position,
48+
)
49+
break
50+
51+
if isinstance(next_partition, partitionmanager.types.MaxValuePartition):
52+
log.debug("Stopping at %s because we can't handle MaxValuePartitions.")
53+
break
54+
55+
assert isinstance(next_partition, partitionmanager.types.PositionPartition)
56+
57+
start_time = partitionmanager.database_helpers.calculate_exact_timestamp_via_query(
58+
database, table, partition
59+
)
60+
end_time = partitionmanager.database_helpers.calculate_exact_timestamp_via_query(
61+
database, table, next_partition
62+
)
63+
64+
approx_size = 0
65+
for a, b in zip(
66+
next_partition.position.as_list(), partition.position.as_list()
67+
):
68+
approx_size += a - b
69+
70+
oldest_age = current_timestamp - start_time
71+
youngest_age = current_timestamp - end_time
72+
73+
if youngest_age > table.retention_period:
74+
results[partition.name] = {
75+
"oldest_time": f"{start_time}",
76+
"youngest_time": f"{end_time}",
77+
"oldest_position": partition.position,
78+
"youngest_position": next_partition.position,
79+
"oldest_age": f"{oldest_age}",
80+
"youngest_age": f"{youngest_age}",
81+
"approx_size": approx_size,
82+
}
83+
droppable.append(partition)
84+
85+
if droppable:
86+
results["drop_query"] = _drop_statement(table, droppable)
87+
88+
return results

partitionmanager/table_append_partition.py

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
Design and perform partition management.
33
"""
44

5-
from datetime import datetime, timedelta, timezone
5+
from datetime import timedelta
66
import logging
77
import operator
88
import re
99

10+
import partitionmanager.database_helpers
1011
import partitionmanager.types
1112
import partitionmanager.tools
1213

@@ -417,48 +418,15 @@ def _get_rate_partitions_with_queried_timestamps(
417418
418419
The partitions' timestamps are explicitly queried.
419420
"""
420-
log = logging.getLogger(
421-
f"_get_rate_partitions_with_queried_timestamps:{table.name}"
422-
)
423421

424422
if not table.has_date_query:
425423
raise ValueError("Table has no defined date query")
426424

427425
instant_partitions = list()
428426

429427
for partition in partition_list:
430-
if len(partition.position) != 1:
431-
raise ValueError(
432-
"This method is only valid for single-column partitions right now"
433-
)
434-
arg = partition.position.as_sql_input()[0]
435-
436-
sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument(
437-
arg
438-
)
439-
log.debug(
440-
"Executing %s to derive partition %s at position %s",
441-
sql_select_cmd,
442-
partition.name,
443-
partition.position,
444-
)
445-
446-
start = datetime.now()
447-
exact_time_result = database.run(sql_select_cmd)
448-
end = datetime.now()
449-
450-
assert len(exact_time_result) == 1
451-
assert len(exact_time_result[0]) == 1
452-
for key, value in exact_time_result[0].items():
453-
exact_time = datetime.fromtimestamp(value, tz=timezone.utc)
454-
break
455-
456-
log.debug(
457-
"Exact time of %s returned for %s at position %s, query took %s",
458-
exact_time,
459-
partition.name,
460-
partition.position,
461-
(end - start),
428+
exact_time = partitionmanager.database_helpers.calculate_exact_timestamp_via_query(
429+
database, table, partition
462430
)
463431

464432
instant_partitions.append(

partitionmanager/types.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ class Table:
3030

3131
def __init__(self, name):
3232
self.name = SqlInput(name)
33-
self.retention = None
33+
self.retention_period = None
3434
self.partition_period = None
3535
self.earliest_utc_timestamp_query = None
3636

37-
def set_retention(self, ret):
37+
def set_retention_period(self, ret):
3838
"""
3939
Sets the retention period as a timedelta for this table
4040
"""
4141
if not isinstance(ret, timedelta):
4242
raise ValueError("Must be a timedelta")
43-
self.retention = ret
43+
self.retention_period = ret
4444
return self
4545

4646
def set_partition_period(self, dur):
@@ -348,6 +348,9 @@ def __lt__(self, other):
348348
return False
349349
return True
350350

351+
def __ge__(self, other):
352+
return not self < other
353+
351354
def __eq__(self, other):
352355
if isinstance(other, PositionPartition):
353356
return self.name == other.name and self._position == other.position
@@ -397,6 +400,9 @@ def __lt__(self, other):
397400
return False
398401
return ValueError()
399402

403+
def __ge__(self, other):
404+
return not self < other
405+
400406
def __eq__(self, other):
401407
if isinstance(other, MaxValuePartition):
402408
return self.name == other.name and self._count == other.num_columns

partitionmanager/types_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def test_table(self):
146146
self.assertEqual(type(Table("name").name), SqlInput)
147147

148148
t = Table("t")
149-
self.assertEqual(None, t.retention)
149+
self.assertEqual(None, t.retention_period)
150150

151151
self.assertEqual(
152152
Table("a").set_partition_period(timedelta(days=9)).partition_period,

0 commit comments

Comments
 (0)