Skip to content

Commit 8f6b927

Browse files
committed
Add command: drop, to calculate partition drops based on retention periods
1 parent 190b94e commit 8f6b927

File tree

5 files changed

+252
-7
lines changed

5 files changed

+252
-7
lines changed

partitionmanager/cli.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import traceback
1111
import yaml
1212

13+
import partitionmanager.database_helpers
14+
import partitionmanager.dropper
1315
import partitionmanager.migrate
1416
import partitionmanager.sql
1517
import partitionmanager.stats
@@ -121,10 +123,10 @@ def from_yaml_file(self, file):
121123
for key in data["tables"]:
122124
tab = partitionmanager.types.Table(key)
123125
tabledata = data["tables"][key]
124-
if isinstance(tabledata, dict) and "retention" in tabledata:
125-
tab.set_retention(
126+
if isinstance(tabledata, dict) and "retention_period" in tabledata:
127+
tab.set_retention_period(
126128
partitionmanager.types.timedelta_from_dict(
127-
tabledata["retention"]
129+
tabledata["retention_period"]
128130
)
129131
)
130132
if isinstance(tabledata, dict) and "partition_period" in tabledata:
@@ -465,6 +467,57 @@ def do_stats(conf, metrics=partitionmanager.stats.PrometheusMetrics()):
465467
return all_results
466468

467469

470+
def drop_cmd(args):
471+
"""Calculates drop.
472+
Helper for argparse.
473+
"""
474+
conf = config_from_args(args)
475+
return do_find_drops_for_tables(conf)
476+
477+
478+
DROP_PARSER = SUBPARSERS.add_parser("drop", help="drop old partitions")
479+
DROP_PARSER.set_defaults(func=drop_cmd)
480+
481+
482+
def do_find_drops_for_tables(conf):
483+
all_results = dict()
484+
for table in conf.tables:
485+
log = logging.getLogger(f"do_find_drops_for_tables:{table.name}")
486+
487+
if not table.has_date_query:
488+
log.debug(f"Cannot process {table}: no date query specified")
489+
continue
490+
491+
if not table.retention_period:
492+
log.debug(f"Cannot process {table}: no retention specified")
493+
continue
494+
495+
try:
496+
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
497+
if table_problems:
498+
log.debug(f"Cannot process {table}: {table_problems}")
499+
continue
500+
501+
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
502+
current_position = partitionmanager.database_helpers.get_position_of_table(
503+
conf.dbcmd, table, map_data
504+
)
505+
506+
droppable = partitionmanager.dropper.get_droppable_partitions(
507+
conf.dbcmd,
508+
map_data["partitions"],
509+
current_position,
510+
conf.curtime,
511+
table,
512+
)
513+
514+
all_results[table.name] = droppable
515+
except Exception as e:
516+
log.warning(f"Error processing table {table.name}")
517+
raise e
518+
return all_results
519+
520+
468521
def main():
469522
"""Start here."""
470523
args = PARSER.parse_args()

partitionmanager/database_helpers.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
"""
2+
Helper functions for database operations
3+
"""
4+
5+
from datetime import datetime, timezone
6+
import logging
7+
8+
import partitionmanager.table_append_partition as pm_tap
9+
import partitionmanager.types
10+
11+
12+
def get_position_of_table(database, table, map_data):
13+
"""Returns a Position of the table at the current moment."""
14+
15+
pos_list = pm_tap.get_current_positions(database, table, map_data["range_cols"])
16+
17+
cur_pos = partitionmanager.types.Position()
18+
cur_pos.set_position([pos_list[col] for col in map_data["range_cols"]])
19+
20+
return cur_pos
21+
22+
23+
def calculate_exact_timestamp_via_query(database, table, position_partition):
24+
"""Calculates the exact timestamp of a PositionPartition.
25+
26+
raises ValueError if the position is incalculable
27+
"""
28+
29+
log = logging.getLogger(f"calculate_exact_timestamp_via_query:{table.name}")
30+
31+
if not table.has_date_query:
32+
raise ValueError("Table has no defined date query")
33+
34+
if not isinstance(position_partition, partitionmanager.types.PositionPartition):
35+
raise ValueError("Only PositionPartitions are supported")
36+
37+
if len(position_partition.position) != 1:
38+
raise ValueError(
39+
"This method is only valid for single-column partitions right now"
40+
)
41+
arg = position_partition.position.as_sql_input()[0]
42+
43+
sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument(arg)
44+
log.debug(
45+
"Executing %s to derive partition %s at position %s",
46+
sql_select_cmd,
47+
position_partition.name,
48+
position_partition.position,
49+
)
50+
51+
start = datetime.now()
52+
exact_time_result = database.run(sql_select_cmd)
53+
end = datetime.now()
54+
55+
if not len(exact_time_result) == 1:
56+
raise partitionmanager.types.NoExactTimeException("No exact timestamp result")
57+
if not len(exact_time_result[0]) == 1:
58+
raise partitionmanager.types.NoExactTimeException(
59+
"Unexpected row count for the timestamp result"
60+
)
61+
for key, value in exact_time_result[0].items():
62+
exact_time = datetime.fromtimestamp(value, tz=timezone.utc)
63+
break
64+
65+
log.debug(
66+
"Exact time of %s returned for %s at position %s, query took %s",
67+
exact_time,
68+
position_partition.name,
69+
position_partition.position,
70+
(end - start),
71+
)
72+
return exact_time

partitionmanager/dropper.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""
2+
Determine which partitions can be dropped.
3+
"""
4+
5+
import logging
6+
7+
import partitionmanager.types
8+
import partitionmanager.tools
9+
10+
11+
def _drop_statement(table, partition_list):
12+
"""Generate an ALTER TABLE statement to drop these partitions."""
13+
14+
log = logging.getLogger("get_droppable_partitions")
15+
16+
partitions = ",".join(map(lambda x: f"`{x.name}`", partition_list))
17+
18+
alter_cmd = (
19+
f"ALTER TABLE `{table.name}` " f"DROP PARTITION IF EXISTS {partitions} ;"
20+
)
21+
22+
log.debug("Yielding %s", alter_cmd)
23+
24+
return alter_cmd
25+
26+
27+
def get_droppable_partitions(
28+
database, partitions, current_position, current_timestamp, table
29+
):
30+
"""Return a dictionary of partitions which can be dropped and why."""
31+
log = logging.getLogger("get_droppable_partitions")
32+
results = {}
33+
droppable = []
34+
35+
if not table.retention_period:
36+
raise ValueError(f"{table.name} does not have a retention period set")
37+
38+
if not partitions:
39+
return results
40+
41+
for partition, next_partition in partitionmanager.tools.pairwise(partitions):
42+
if next_partition >= current_position:
43+
log.debug(
44+
"Stopping at %s because current position %s indicates "
45+
"subsequent partition is empty",
46+
partition,
47+
current_position,
48+
)
49+
break
50+
51+
if isinstance(next_partition, partitionmanager.types.MaxValuePartition):
52+
log.debug("Stopping at %s because we can't handle MaxValuePartitions.")
53+
break
54+
55+
assert isinstance(next_partition, partitionmanager.types.PositionPartition)
56+
57+
approx_size = 0
58+
for a, b in zip(
59+
next_partition.position.as_list(), partition.position.as_list()
60+
):
61+
approx_size += a - b
62+
63+
try:
64+
start_time = (
65+
partitionmanager.database_helpers.calculate_exact_timestamp_via_query(
66+
database, table, partition
67+
)
68+
)
69+
end_time = (
70+
partitionmanager.database_helpers.calculate_exact_timestamp_via_query(
71+
database, table, next_partition
72+
)
73+
)
74+
75+
oldest_age = current_timestamp - start_time
76+
youngest_age = current_timestamp - end_time
77+
78+
if youngest_age > table.retention_period:
79+
results[partition.name] = {
80+
"oldest_time": f"{start_time}",
81+
"youngest_time": f"{end_time}",
82+
"oldest_position": partition.position,
83+
"youngest_position": next_partition.position,
84+
"oldest_age": f"{oldest_age}",
85+
"youngest_age": f"{youngest_age}",
86+
"approx_size": approx_size,
87+
}
88+
droppable.append(partition)
89+
except partitionmanager.types.NoExactTimeException:
90+
log.warning(
91+
"Couldn't determine exact times for %s.%s, it is probably droppable too.",
92+
table,
93+
partition,
94+
)
95+
96+
results[partition.name] = {
97+
"oldest_time": "unable to determine",
98+
"youngest_time": "unable to determine",
99+
"oldest_position": partition.position,
100+
"youngest_position": next_partition.position,
101+
"oldest_age": "unable to determine",
102+
"youngest_age": "unable to determine",
103+
"approx_size": approx_size,
104+
}
105+
droppable.append(partition)
106+
107+
if droppable:
108+
results["drop_query"] = _drop_statement(table, droppable)
109+
110+
return results

partitionmanager/types.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ class Table:
3030

3131
def __init__(self, name):
3232
self.name = SqlInput(name)
33-
self.retention = None
33+
self.retention_period = None
3434
self.partition_period = None
3535
self.earliest_utc_timestamp_query = None
3636

37-
def set_retention(self, ret):
37+
def set_retention_period(self, ret):
3838
"""
3939
Sets the retention period as a timedelta for this table
4040
"""
4141
if not isinstance(ret, timedelta):
4242
raise ValueError("Must be a timedelta")
43-
self.retention = ret
43+
self.retention_period = ret
4444
return self
4545

4646
def set_partition_period(self, dur):
@@ -350,6 +350,9 @@ def __lt__(self, other):
350350
return True
351351
return False
352352

353+
def __ge__(self, other):
354+
return not self < other
355+
353356
def __eq__(self, other):
354357
if isinstance(other, PositionPartition):
355358
return self.name == other.name and self._position == other.position
@@ -399,6 +402,9 @@ def __lt__(self, other):
399402
return False
400403
return ValueError()
401404

405+
def __ge__(self, other):
406+
return not self < other
407+
402408
def __eq__(self, other):
403409
if isinstance(other, MaxValuePartition):
404410
return self.name == other.name and self._count == other.num_columns
@@ -609,3 +615,7 @@ class NoEmptyPartitionsAvailableException(Exception):
609615

610616
class DatabaseCommandException(Exception):
611617
"""Raised if the database command failed."""
618+
619+
620+
class NoExactTimeException(Exception):
621+
"""Raised if there's no exact time available for this partition."""

partitionmanager/types_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def test_table(self):
146146
self.assertEqual(type(Table("name").name), SqlInput)
147147

148148
t = Table("t")
149-
self.assertEqual(None, t.retention)
149+
self.assertEqual(None, t.retention_period)
150150

151151
self.assertEqual(
152152
Table("a").set_partition_period(timedelta(days=9)).partition_period,

0 commit comments

Comments
 (0)