From 8f6b9274318c95490418044789a5a45df179f4b5 Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Wed, 26 Apr 2023 14:22:17 -0700 Subject: [PATCH 1/9] Add command: drop, to calculate partition drops based on retention periods --- partitionmanager/cli.py | 59 +++++++++++++- partitionmanager/database_helpers.py | 72 ++++++++++++++++++ partitionmanager/dropper.py | 110 +++++++++++++++++++++++++++ partitionmanager/types.py | 16 +++- partitionmanager/types_test.py | 2 +- 5 files changed, 252 insertions(+), 7 deletions(-) create mode 100644 partitionmanager/database_helpers.py create mode 100644 partitionmanager/dropper.py diff --git a/partitionmanager/cli.py b/partitionmanager/cli.py index 6e319ff..f36bd1e 100644 --- a/partitionmanager/cli.py +++ b/partitionmanager/cli.py @@ -10,6 +10,8 @@ import traceback import yaml +import partitionmanager.database_helpers +import partitionmanager.dropper import partitionmanager.migrate import partitionmanager.sql import partitionmanager.stats @@ -121,10 +123,10 @@ def from_yaml_file(self, file): for key in data["tables"]: tab = partitionmanager.types.Table(key) tabledata = data["tables"][key] - if isinstance(tabledata, dict) and "retention" in tabledata: - tab.set_retention( + if isinstance(tabledata, dict) and "retention_period" in tabledata: + tab.set_retention_period( partitionmanager.types.timedelta_from_dict( - tabledata["retention"] + tabledata["retention_period"] ) ) if isinstance(tabledata, dict) and "partition_period" in tabledata: @@ -465,6 +467,57 @@ def do_stats(conf, metrics=partitionmanager.stats.PrometheusMetrics()): return all_results +def drop_cmd(args): + """Calculates drop. + Helper for argparse. + """ + conf = config_from_args(args) + return do_find_drops_for_tables(conf) + + +DROP_PARSER = SUBPARSERS.add_parser("drop", help="drop old partitions") +DROP_PARSER.set_defaults(func=drop_cmd) + + +def do_find_drops_for_tables(conf): + all_results = dict() + for table in conf.tables: + log = logging.getLogger(f"do_find_drops_for_tables:{table.name}") + + if not table.has_date_query: + log.debug(f"Cannot process {table}: no date query specified") + continue + + if not table.retention_period: + log.debug(f"Cannot process {table}: no retention specified") + continue + + try: + table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table) + if table_problems: + log.debug(f"Cannot process {table}: {table_problems}") + continue + + map_data = pm_tap.get_partition_map(conf.dbcmd, table) + current_position = partitionmanager.database_helpers.get_position_of_table( + conf.dbcmd, table, map_data + ) + + droppable = partitionmanager.dropper.get_droppable_partitions( + conf.dbcmd, + map_data["partitions"], + current_position, + conf.curtime, + table, + ) + + all_results[table.name] = droppable + except Exception as e: + log.warning(f"Error processing table {table.name}") + raise e + return all_results + + def main(): """Start here.""" args = PARSER.parse_args() diff --git a/partitionmanager/database_helpers.py b/partitionmanager/database_helpers.py new file mode 100644 index 0000000..2bae720 --- /dev/null +++ b/partitionmanager/database_helpers.py @@ -0,0 +1,72 @@ +""" +Helper functions for database operations +""" + +from datetime import datetime, timezone +import logging + +import partitionmanager.table_append_partition as pm_tap +import partitionmanager.types + + +def get_position_of_table(database, table, map_data): + """Returns a Position of the table at the current moment.""" + + pos_list = pm_tap.get_current_positions(database, table, map_data["range_cols"]) + + cur_pos = partitionmanager.types.Position() + cur_pos.set_position([pos_list[col] for col in map_data["range_cols"]]) + + return cur_pos + + +def calculate_exact_timestamp_via_query(database, table, position_partition): + """Calculates the exact timestamp of a PositionPartition. + + raises ValueError if the position is incalculable + """ + + log = logging.getLogger(f"calculate_exact_timestamp_via_query:{table.name}") + + if not table.has_date_query: + raise ValueError("Table has no defined date query") + + if not isinstance(position_partition, partitionmanager.types.PositionPartition): + raise ValueError("Only PositionPartitions are supported") + + if len(position_partition.position) != 1: + raise ValueError( + "This method is only valid for single-column partitions right now" + ) + arg = position_partition.position.as_sql_input()[0] + + sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument(arg) + log.debug( + "Executing %s to derive partition %s at position %s", + sql_select_cmd, + position_partition.name, + position_partition.position, + ) + + start = datetime.now() + exact_time_result = database.run(sql_select_cmd) + end = datetime.now() + + if not len(exact_time_result) == 1: + raise partitionmanager.types.NoExactTimeException("No exact timestamp result") + if not len(exact_time_result[0]) == 1: + raise partitionmanager.types.NoExactTimeException( + "Unexpected row count for the timestamp result" + ) + for key, value in exact_time_result[0].items(): + exact_time = datetime.fromtimestamp(value, tz=timezone.utc) + break + + log.debug( + "Exact time of %s returned for %s at position %s, query took %s", + exact_time, + position_partition.name, + position_partition.position, + (end - start), + ) + return exact_time diff --git a/partitionmanager/dropper.py b/partitionmanager/dropper.py new file mode 100644 index 0000000..b4bf428 --- /dev/null +++ b/partitionmanager/dropper.py @@ -0,0 +1,110 @@ +""" +Determine which partitions can be dropped. +""" + +import logging + +import partitionmanager.types +import partitionmanager.tools + + +def _drop_statement(table, partition_list): + """Generate an ALTER TABLE statement to drop these partitions.""" + + log = logging.getLogger("get_droppable_partitions") + + partitions = ",".join(map(lambda x: f"`{x.name}`", partition_list)) + + alter_cmd = ( + f"ALTER TABLE `{table.name}` " f"DROP PARTITION IF EXISTS {partitions} ;" + ) + + log.debug("Yielding %s", alter_cmd) + + return alter_cmd + + +def get_droppable_partitions( + database, partitions, current_position, current_timestamp, table +): + """Return a dictionary of partitions which can be dropped and why.""" + log = logging.getLogger("get_droppable_partitions") + results = {} + droppable = [] + + if not table.retention_period: + raise ValueError(f"{table.name} does not have a retention period set") + + if not partitions: + return results + + for partition, next_partition in partitionmanager.tools.pairwise(partitions): + if next_partition >= current_position: + log.debug( + "Stopping at %s because current position %s indicates " + "subsequent partition is empty", + partition, + current_position, + ) + break + + if isinstance(next_partition, partitionmanager.types.MaxValuePartition): + log.debug("Stopping at %s because we can't handle MaxValuePartitions.") + break + + assert isinstance(next_partition, partitionmanager.types.PositionPartition) + + approx_size = 0 + for a, b in zip( + next_partition.position.as_list(), partition.position.as_list() + ): + approx_size += a - b + + try: + start_time = ( + partitionmanager.database_helpers.calculate_exact_timestamp_via_query( + database, table, partition + ) + ) + end_time = ( + partitionmanager.database_helpers.calculate_exact_timestamp_via_query( + database, table, next_partition + ) + ) + + oldest_age = current_timestamp - start_time + youngest_age = current_timestamp - end_time + + if youngest_age > table.retention_period: + results[partition.name] = { + "oldest_time": f"{start_time}", + "youngest_time": f"{end_time}", + "oldest_position": partition.position, + "youngest_position": next_partition.position, + "oldest_age": f"{oldest_age}", + "youngest_age": f"{youngest_age}", + "approx_size": approx_size, + } + droppable.append(partition) + except partitionmanager.types.NoExactTimeException: + log.warning( + "Couldn't determine exact times for %s.%s, it is probably droppable too.", + table, + partition, + ) + + results[partition.name] = { + "oldest_time": "unable to determine", + "youngest_time": "unable to determine", + "oldest_position": partition.position, + "youngest_position": next_partition.position, + "oldest_age": "unable to determine", + "youngest_age": "unable to determine", + "approx_size": approx_size, + } + droppable.append(partition) + + if droppable: + results["drop_query"] = _drop_statement(table, droppable) + + return results diff --git a/partitionmanager/types.py b/partitionmanager/types.py index 774d3ba..b3c1eee 100644 --- a/partitionmanager/types.py +++ b/partitionmanager/types.py @@ -30,17 +30,17 @@ class Table: def __init__(self, name): self.name = SqlInput(name) - self.retention = None + self.retention_period = None self.partition_period = None self.earliest_utc_timestamp_query = None - def set_retention(self, ret): + def set_retention_period(self, ret): """ Sets the retention period as a timedelta for this table """ if not isinstance(ret, timedelta): raise ValueError("Must be a timedelta") - self.retention = ret + self.retention_period = ret return self def set_partition_period(self, dur): @@ -350,6 +350,9 @@ def __lt__(self, other): return True return False + def __ge__(self, other): + return not self < other + def __eq__(self, other): if isinstance(other, PositionPartition): return self.name == other.name and self._position == other.position @@ -399,6 +402,9 @@ def __lt__(self, other): return False return ValueError() + def __ge__(self, other): + return not self < other + def __eq__(self, other): if isinstance(other, MaxValuePartition): return self.name == other.name and self._count == other.num_columns @@ -609,3 +615,7 @@ class NoEmptyPartitionsAvailableException(Exception): class DatabaseCommandException(Exception): """Raised if the database command failed.""" + + +class NoExactTimeException(Exception): + """Raised if there's no exact time available for this partition.""" diff --git a/partitionmanager/types_test.py b/partitionmanager/types_test.py index 25a9014..5e28111 100644 --- a/partitionmanager/types_test.py +++ b/partitionmanager/types_test.py @@ -146,7 +146,7 @@ def test_table(self): self.assertEqual(type(Table("name").name), SqlInput) t = Table("t") - self.assertEqual(None, t.retention) + self.assertEqual(None, t.retention_period) self.assertEqual( Table("a").set_partition_period(timedelta(days=9)).partition_period, From cc9c7e9a4629c1adec2766c7862e09918c777ba1 Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Wed, 26 Apr 2023 15:38:52 -0700 Subject: [PATCH 2/9] Deduplicate methods that moved into database_helpers --- partitionmanager/cli.py | 10 +---- partitionmanager/cli_test.py | 2 - partitionmanager/table_append_partition.py | 43 ++-------------------- 3 files changed, 6 insertions(+), 49 deletions(-) diff --git a/partitionmanager/cli.py b/partitionmanager/cli.py index f36bd1e..2078bb8 100644 --- a/partitionmanager/cli.py +++ b/partitionmanager/cli.py @@ -320,16 +320,10 @@ def do_partition(conf): duration = table.partition_period log.info(f"Evaluating {table} (duration={duration})") - - positions = pm_tap.get_current_positions( - conf.dbcmd, table, map_data["range_cols"] + cur_pos = partitionmanager.database_helpers.get_position_of_table( + conf.dbcmd, table, map_data ) - log.info(f"{table} (pos={positions})") - - cur_pos = partitionmanager.types.Position() - cur_pos.set_position([positions[col] for col in map_data["range_cols"]]) - sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands( database=conf.dbcmd, table=table, diff --git a/partitionmanager/cli_test.py b/partitionmanager/cli_test.py index 398ccf2..e2da77e 100644 --- a/partitionmanager/cli_test.py +++ b/partitionmanager/cli_test.py @@ -224,11 +224,9 @@ def test_partition_period_seven_days(self): [ "INFO:partition:Evaluating Table partitioned_last_week " "(duration=7 days, 0:00:00)", - "INFO:partition:Table partitioned_last_week (pos={'id': 150})", "DEBUG:partition:Table partitioned_last_week has no pending SQL updates.", "INFO:partition:Evaluating Table partitioned_yesterday " "(duration=7 days, 0:00:00)", - "INFO:partition:Table partitioned_yesterday (pos={'id': 150})", "DEBUG:partition:Table partitioned_yesterday has no pending SQL updates.", ] ), diff --git a/partitionmanager/table_append_partition.py b/partitionmanager/table_append_partition.py index 87e7dee..7f8dc32 100644 --- a/partitionmanager/table_append_partition.py +++ b/partitionmanager/table_append_partition.py @@ -2,7 +2,7 @@ Design and perform partition management. """ -from datetime import datetime, timedelta, timezone +from datetime import timedelta import logging import operator import re @@ -425,9 +425,6 @@ def _get_rate_partitions_with_queried_timestamps( The partitions' timestamps are explicitly queried. """ - log = logging.getLogger( - f"_get_rate_partitions_with_queried_timestamps:{table.name}" - ) if not table.has_date_query: raise ValueError("Table has no defined date query") @@ -435,42 +432,10 @@ def _get_rate_partitions_with_queried_timestamps( instant_partitions = list() for partition in partition_list: - if len(partition.position) != 1: - raise ValueError( - "This method is only valid for single-column partitions right now" + exact_time = ( + partitionmanager.database_helpers.calculate_exact_timestamp_via_query( + database, table, partition ) - arg = partition.position.as_sql_input()[0] - - sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument( - arg - ) - log.debug( - "Executing %s to derive partition %s at position %s", - sql_select_cmd, - partition.name, - partition.position, - ) - - start = datetime.now() - exact_time_result = database.run(sql_select_cmd) - end = datetime.now() - - if not exact_time_result: - log.debug("No result found for position %s", arg) - continue - - assert len(exact_time_result) == 1 - assert len(exact_time_result[0]) == 1 - for key, value in exact_time_result[0].items(): - exact_time = datetime.fromtimestamp(value, tz=timezone.utc) - break - - log.debug( - "Exact time of %s returned for %s at position %s, query took %s", - exact_time, - partition.name, - partition.position, - (end - start), ) instant_partitions.append( From 0675e33bc0118eaf0a7cb2ac000ecaf57dcf5cef Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Wed, 26 Apr 2023 16:20:54 -0700 Subject: [PATCH 3/9] Add database helper tests --- partitionmanager/database_helpers_test.py | 79 +++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 partitionmanager/database_helpers_test.py diff --git a/partitionmanager/database_helpers_test.py b/partitionmanager/database_helpers_test.py new file mode 100644 index 0000000..d63ee68 --- /dev/null +++ b/partitionmanager/database_helpers_test.py @@ -0,0 +1,79 @@ +import unittest + +from .database_helpers import get_position_of_table, calculate_exact_timestamp_via_query + +from .types import ( + DatabaseCommand, + Table, + SqlInput, + SqlQuery, + PositionPartition, +) + + +class MockDatabase(DatabaseCommand): + def __init__(self): + self._responses = list() + self.num_queries = 0 + + def add_response(self, expected, response): + self._responses.append({"expected": expected, "response": response}) + + def run(self, cmd): + self.num_queries += 1 + if not self._responses: + raise Exception(f"No mock responses available for cmd [{cmd}]") + + r = self._responses.pop() + if r["expected"] in cmd: + return r["response"] + + raise Exception(f"Received command [{cmd}] and expected [{r['expected']}]") + + def db_name(self): + return SqlInput("the-database") + + +class TestDatabaseHelpers(unittest.TestCase): + def test_position_of_table(self): + db = MockDatabase() + db.add_response("SELECT id FROM `burgers` ORDER BY", [{"id": 90210}]) + + table = Table("burgers") + data = {"range_cols": ["id"]} + + pos = get_position_of_table(db, table, data) + self.assertEqual(pos.as_list(), [90210]) + + def test_exact_timestamp_no_query(self): + db = MockDatabase() + db.add_response("SELECT id FROM `burgers` ORDER BY", [{"id": 42}]) + + table = Table("burgers") + self.assertFalse(table.has_date_query) + + pos = PositionPartition("p_start") + pos.set_position([42]) + + with self.assertRaises(ValueError): + calculate_exact_timestamp_via_query(db, table, pos) + + def test_exact_timestamp(self): + db = MockDatabase() + db.add_response( + "SELECT UNIX_TIMESTAMP(`cooked`)", [{"UNIX_TIMESTAMP": 17541339060}] + ) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `type` = \"burger\" AND `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + + pos = PositionPartition("p_start") + pos.set_position([150]) + + ts = calculate_exact_timestamp_via_query(db, table, pos) + self.assertEqual(f"{ts}", "2525-11-11 18:11:00+00:00") From 84c61eb4f0b01342ad18b6daa9b9bc94d9c81a6f Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Fri, 28 Apr 2023 14:45:10 -0700 Subject: [PATCH 4/9] Add dropper tests --- partitionmanager/database_helpers_test.py | 2 +- partitionmanager/dropper.py | 7 +- partitionmanager/dropper_test.py | 118 ++++++++++++++++++++++ partitionmanager/types_test.py | 5 + 4 files changed, 128 insertions(+), 4 deletions(-) create mode 100644 partitionmanager/dropper_test.py diff --git a/partitionmanager/database_helpers_test.py b/partitionmanager/database_helpers_test.py index d63ee68..c34bbfe 100644 --- a/partitionmanager/database_helpers_test.py +++ b/partitionmanager/database_helpers_test.py @@ -17,7 +17,7 @@ def __init__(self): self.num_queries = 0 def add_response(self, expected, response): - self._responses.append({"expected": expected, "response": response}) + self._responses.insert(0, {"expected": expected, "response": response}) def run(self, cmd): self.num_queries += 1 diff --git a/partitionmanager/dropper.py b/partitionmanager/dropper.py index b4bf428..64d60aa 100644 --- a/partitionmanager/dropper.py +++ b/partitionmanager/dropper.py @@ -13,11 +13,12 @@ def _drop_statement(table, partition_list): log = logging.getLogger("get_droppable_partitions") + if not partition_list: + raise ValueError("Partition list may not be empty") + partitions = ",".join(map(lambda x: f"`{x.name}`", partition_list)) - alter_cmd = ( - f"ALTER TABLE `{table.name}` " f"DROP PARTITION IF EXISTS {partitions} ;" - ) + alter_cmd = f"ALTER TABLE `{table.name}` " f"DROP PARTITION IF EXISTS {partitions};" log.debug("Yielding %s", alter_cmd) diff --git a/partitionmanager/dropper_test.py b/partitionmanager/dropper_test.py new file mode 100644 index 0000000..c4b3b00 --- /dev/null +++ b/partitionmanager/dropper_test.py @@ -0,0 +1,118 @@ +import unittest +from datetime import datetime, timedelta, timezone + +from .dropper import _drop_statement, get_droppable_partitions +from .types import ( + DatabaseCommand, + Table, + SqlInput, + SqlQuery, + PositionPartition, +) +from .types_test import mkPPart, mkTailPart, mkPos + + +class MockDatabase(DatabaseCommand): + def __init__(self): + self._responses = list() + self.num_queries = 0 + + def add_response(self, expected, response): + self._responses.insert(0, {"expected": expected, "response": response}) + + def run(self, cmd): + self.num_queries += 1 + if not self._responses: + raise Exception(f"No mock responses available for cmd [{cmd}]") + + r = self._responses.pop() + if r["expected"] in cmd: + return r["response"] + + raise Exception(f"Received command [{cmd}] and expected [{r['expected']}]") + + def db_name(self): + return SqlInput("the-database") + + +class TestDropper(unittest.TestCase): + def test_drop_statement_empty(self): + table = Table("burgers") + parts = [] + with self.assertRaises(ValueError): + _drop_statement(table, parts) + + def test_drop_statement(self): + table = Table("burgers") + parts = [PositionPartition("p_start")] + self.assertEqual( + _drop_statement(table, parts), + "ALTER TABLE `burgers` DROP PARTITION IF EXISTS `p_start`;", + ) + + def test_get_droppable_partitions_invalid_config(self): + database = MockDatabase() + table = Table("burgers") + partitions = [PositionPartition("p_start")] + current_timestamp = datetime(2021, 1, 1, tzinfo=timezone.utc) + current_position = PositionPartition("p_20210102").set_position([10]) + + with self.assertRaises(ValueError): + get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + + def test_get_droppable_partitions(self): + database = MockDatabase() + database.add_response("WHERE `id` > '100'", [{"UNIX_TIMESTAMP": 1621468800}]) + database.add_response("WHERE `id` > '200'", [{"UNIX_TIMESTAMP": 1622073600}]) + database.add_response("WHERE `id` > '200'", [{"UNIX_TIMESTAMP": 1622073600}]) + database.add_response("WHERE `id` > '300'", [{"UNIX_TIMESTAMP": 1622678400}]) + database.add_response("WHERE `id` > '300'", [{"UNIX_TIMESTAMP": 1622678400}]) + database.add_response("WHERE `id` > '400'", [{"UNIX_TIMESTAMP": 1623283200}]) + database.add_response("WHERE `id` > '400'", [{"UNIX_TIMESTAMP": 1623283200}]) + database.add_response("WHERE `id` > '500'", [{"UNIX_TIMESTAMP": 1623888000}]) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + current_timestamp = datetime(2021, 7, 1, tzinfo=timezone.utc) + + partitions = [ + mkPPart("1", 100), + mkPPart("2", 200), + mkPPart("3", 300), + mkPPart("4", 400), + mkPPart("5", 500), + mkPPart("6", 600), + mkTailPart("z"), + ] + current_position = mkPos(340) + + table.set_retention_period(timedelta(days=2)) + results = get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + self.assertEqual( + results["drop_query"], + "ALTER TABLE `burgers` DROP PARTITION IF EXISTS `1`,`2`;", + ) + self.assertEqual(results["1"]["oldest_time"], "2021-05-20 00:00:00+00:00") + self.assertEqual(results["1"]["youngest_time"], "2021-05-27 00:00:00+00:00") + self.assertEqual(results["1"]["oldest_position"].as_list(), [100]) + self.assertEqual(results["1"]["youngest_position"].as_list(), [200]) + self.assertEqual(results["1"]["oldest_age"], "42 days, 0:00:00") + self.assertEqual(results["1"]["youngest_age"], "35 days, 0:00:00") + self.assertEqual(results["1"]["approx_size"], 100) + + self.assertEqual(results["2"]["oldest_time"], "2021-05-27 00:00:00+00:00") + self.assertEqual(results["2"]["youngest_time"], "2021-06-03 00:00:00+00:00") + self.assertEqual(results["2"]["oldest_position"].as_list(), [200]) + self.assertEqual(results["2"]["youngest_position"].as_list(), [300]) + self.assertEqual(results["2"]["oldest_age"], "35 days, 0:00:00") + self.assertEqual(results["2"]["youngest_age"], "28 days, 0:00:00") + self.assertEqual(results["2"]["approx_size"], 100) diff --git a/partitionmanager/types_test.py b/partitionmanager/types_test.py index 5e28111..df9904d 100644 --- a/partitionmanager/types_test.py +++ b/partitionmanager/types_test.py @@ -153,6 +153,11 @@ def test_table(self): timedelta(days=9), ) + self.assertEqual( + Table("a").set_retention_period(timedelta(days=9)).retention_period, + timedelta(days=9), + ) + with self.assertRaises(argparse.ArgumentTypeError): timedelta_from_dict({"something": 1}) From d19f7c160e481386344b43d4a85f88ea27477abe Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Fri, 28 Apr 2023 15:17:35 -0700 Subject: [PATCH 5/9] More test cleanups --- partitionmanager/cli.py | 4 +-- partitionmanager/cli_test.py | 58 ++++++++++++++++++++++++++++++++ partitionmanager/dropper_test.py | 57 ++++++++++++++++++++++++++----- 3 files changed, 109 insertions(+), 10 deletions(-) diff --git a/partitionmanager/cli.py b/partitionmanager/cli.py index 2078bb8..723b650 100644 --- a/partitionmanager/cli.py +++ b/partitionmanager/cli.py @@ -479,11 +479,11 @@ def do_find_drops_for_tables(conf): log = logging.getLogger(f"do_find_drops_for_tables:{table.name}") if not table.has_date_query: - log.debug(f"Cannot process {table}: no date query specified") + log.warning(f"Cannot process {table}: no date query specified") continue if not table.retention_period: - log.debug(f"Cannot process {table}: no retention specified") + log.warning(f"Cannot process {table}: no retention specified") continue try: diff --git a/partitionmanager/cli_test.py b/partitionmanager/cli_test.py index e2da77e..a926bfd 100644 --- a/partitionmanager/cli_test.py +++ b/partitionmanager/cli_test.py @@ -8,6 +8,7 @@ migrate_cmd, config_from_args, do_partition, + drop_cmd, PARSER, partition_cmd, stats_cmd, @@ -624,3 +625,60 @@ def test_migrate_cmd_in_out(self): "flip", ] ) + + +class TestDropCmd(unittest.TestCase): + def _run_drop_cmd_yaml(self, yaml): + with tempfile.NamedTemporaryFile() as tmpfile: + insert_into_file(tmpfile, yaml) + args = PARSER.parse_args(["--config", tmpfile.name, "drop"]) + return drop_cmd(args) + + def test_drop_invalid_config(self): + with self.assertLogs( + "do_find_drops_for_tables:unused", level="WARNING" + ) as logctx: + self._run_drop_cmd_yaml( + f""" +partitionmanager: + mariadb: {str(fake_exec)} + tables: + unused: + earliest_utc_timestamp_query: > + SELECT UNIX_TIMESTAMP(`issued`) FROM `unused` + WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1; +""" + ) + self.assertEqual( + set(logctx.output), + set( + [ + "WARNING:do_find_drops_for_tables:unused:" + "Cannot process Table unused: no retention specified" + ] + ), + ) + + def test_drop_no_sql(self): + with self.assertLogs( + "do_find_drops_for_tables:unused", level="WARNING" + ) as logctx: + self._run_drop_cmd_yaml( + f""" +partitionmanager: + mariadb: {str(fake_exec)} + tables: + unused: + retention_period: + days: 180 + """ + ) + self.assertEqual( + set(logctx.output), + set( + [ + "WARNING:do_find_drops_for_tables:unused:" + "Cannot process Table unused: no date query specified" + ] + ), + ) diff --git a/partitionmanager/dropper_test.py b/partitionmanager/dropper_test.py index c4b3b00..7f5ddc2 100644 --- a/partitionmanager/dropper_test.py +++ b/partitionmanager/dropper_test.py @@ -12,6 +12,12 @@ from .types_test import mkPPart, mkTailPart, mkPos +def _timestamp_rsp(year, mo, day): + return [ + {"UNIX_TIMESTAMP": datetime(year, mo, day, tzinfo=timezone.utc).timestamp()} + ] + + class MockDatabase(DatabaseCommand): def __init__(self): self._responses = list() @@ -64,14 +70,14 @@ def test_get_droppable_partitions_invalid_config(self): def test_get_droppable_partitions(self): database = MockDatabase() - database.add_response("WHERE `id` > '100'", [{"UNIX_TIMESTAMP": 1621468800}]) - database.add_response("WHERE `id` > '200'", [{"UNIX_TIMESTAMP": 1622073600}]) - database.add_response("WHERE `id` > '200'", [{"UNIX_TIMESTAMP": 1622073600}]) - database.add_response("WHERE `id` > '300'", [{"UNIX_TIMESTAMP": 1622678400}]) - database.add_response("WHERE `id` > '300'", [{"UNIX_TIMESTAMP": 1622678400}]) - database.add_response("WHERE `id` > '400'", [{"UNIX_TIMESTAMP": 1623283200}]) - database.add_response("WHERE `id` > '400'", [{"UNIX_TIMESTAMP": 1623283200}]) - database.add_response("WHERE `id` > '500'", [{"UNIX_TIMESTAMP": 1623888000}]) + database.add_response("WHERE `id` > '100'", _timestamp_rsp(2021, 5, 20)) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 27)) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 27)) + database.add_response("WHERE `id` > '300'", _timestamp_rsp(2021, 6, 3)) + database.add_response("WHERE `id` > '300'", _timestamp_rsp(2021, 6, 3)) + database.add_response("WHERE `id` > '400'", _timestamp_rsp(2021, 6, 10)) + database.add_response("WHERE `id` > '400'", _timestamp_rsp(2021, 6, 10)) + database.add_response("WHERE `id` > '500'", _timestamp_rsp(2021, 6, 17)) table = Table("burgers") table.set_earliest_utc_timestamp_query( @@ -116,3 +122,38 @@ def test_get_droppable_partitions(self): self.assertEqual(results["2"]["oldest_age"], "35 days, 0:00:00") self.assertEqual(results["2"]["youngest_age"], "28 days, 0:00:00") self.assertEqual(results["2"]["approx_size"], 100) + + def test_drop_nothing_to_do(self): + database = MockDatabase() + database.add_response("WHERE `id` > '100'", _timestamp_rsp(2021, 5, 1)) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 8)) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 8)) + database.add_response("WHERE `id` > '300'", _timestamp_rsp(2021, 5, 19)) + database.add_response("WHERE `id` > '300'", _timestamp_rsp(2021, 5, 19)) + database.add_response("WHERE `id` > '400'", _timestamp_rsp(2021, 5, 24)) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + current_timestamp = datetime(2021, 6, 1, tzinfo=timezone.utc) + + partitions = [ + mkPPart("1", 100), + mkPPart("2", 200), + mkPPart("3", 300), + mkPPart("4", 400), + mkPPart("5", 500), + mkPPart("6", 600), + mkTailPart("z"), + ] + current_position = mkPos(340) + + table.set_retention_period(timedelta(days=30)) + results = get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + self.assertNotIn("drop_query", results) From 41f2e3d48bc4c9d8a22cdb161192ddcf11dd44bc Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Mon, 2 Oct 2023 13:53:04 -0700 Subject: [PATCH 6/9] Update to PyLint 2.17.7 to fix Python11 --- .github/workflows/ci.yml | 2 +- .pre-commit-config.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1a8bc4f..f354ff0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: - name: Install Linting Tools run: | python -m pip install --upgrade pip - pip install --user pylint==2.6.0 + pip install --user pylint==2.17.7 pip install --user black~=22.3 pip install --user flake8~=4.0 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0e3382c..ccccb8a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,7 +17,7 @@ repos: hooks: - id: flake8 - repo: https://github.com/PyCQA/pylint - rev: pylint-2.6.0 + rev: v2.17.7 hooks: - id: pylint args: From cc9bd01d87f2526798dcb4df4005e82d644d785e Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Fri, 1 Dec 2023 14:09:52 -0700 Subject: [PATCH 7/9] More tests --- partitionmanager/database_helpers.py | 2 +- partitionmanager/database_helpers_test.py | 36 +++++++++++++-- partitionmanager/dropper_test.py | 54 ++++++++++++++++++++++- partitionmanager/types_test.py | 5 +++ 4 files changed, 91 insertions(+), 6 deletions(-) diff --git a/partitionmanager/database_helpers.py b/partitionmanager/database_helpers.py index 2bae720..98400d1 100644 --- a/partitionmanager/database_helpers.py +++ b/partitionmanager/database_helpers.py @@ -56,7 +56,7 @@ def calculate_exact_timestamp_via_query(database, table, position_partition): raise partitionmanager.types.NoExactTimeException("No exact timestamp result") if not len(exact_time_result[0]) == 1: raise partitionmanager.types.NoExactTimeException( - "Unexpected row count for the timestamp result" + "Unexpected column count for the timestamp result" ) for key, value in exact_time_result[0].items(): exact_time = datetime.fromtimestamp(value, tz=timezone.utc) diff --git a/partitionmanager/database_helpers_test.py b/partitionmanager/database_helpers_test.py index c34bbfe..188752c 100644 --- a/partitionmanager/database_helpers_test.py +++ b/partitionmanager/database_helpers_test.py @@ -4,10 +4,11 @@ from .types import ( DatabaseCommand, - Table, + NoExactTimeException, + PositionPartition, SqlInput, SqlQuery, - PositionPartition, + Table, ) @@ -76,4 +77,33 @@ def test_exact_timestamp(self): pos.set_position([150]) ts = calculate_exact_timestamp_via_query(db, table, pos) - self.assertEqual(f"{ts}", "2525-11-11 18:11:00+00:00") + assert f"{ts}" == "2525-11-11 18:11:00+00:00" + + def test_no_exact_timestamp(self): + db = MockDatabase() + db.add_response( + "SELECT UNIX_TIMESTAMP(`cooked`)", + [{"UNIX_TIMESTAMP": 17541339060}, {"UNIX_TIMESTAMP": 17541339070}], + ) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `type` = \"burger\" AND `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + + pos = PositionPartition("p_start") + pos.set_position([150]) + + with self.assertRaises(NoExactTimeException): + calculate_exact_timestamp_via_query(db, table, pos) + + db.add_response( + "SELECT UNIX_TIMESTAMP(`cooked`)", + [{"UNIX_TIMESTAMP": 17541339060, "column2": True}], + ) + + with self.assertRaises(NoExactTimeException): + calculate_exact_timestamp_via_query(db, table, pos) diff --git a/partitionmanager/dropper_test.py b/partitionmanager/dropper_test.py index 7f5ddc2..2e5a3a9 100644 --- a/partitionmanager/dropper_test.py +++ b/partitionmanager/dropper_test.py @@ -4,10 +4,10 @@ from .dropper import _drop_statement, get_droppable_partitions from .types import ( DatabaseCommand, - Table, + PositionPartition, SqlInput, SqlQuery, - PositionPartition, + Table, ) from .types_test import mkPPart, mkTailPart, mkPos @@ -68,6 +68,22 @@ def test_get_droppable_partitions_invalid_config(self): database, partitions, current_position, current_timestamp, table ) + def test_no_droppable_partitions(self): + database = MockDatabase() + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + table.set_retention_period(timedelta(days=2)) + current_timestamp = datetime(2021, 1, 1, tzinfo=timezone.utc) + current_position = PositionPartition("p_20210102").set_position([10]) + assert {} == get_droppable_partitions( + database, [], current_position, current_timestamp, table + ) + def test_get_droppable_partitions(self): database = MockDatabase() database.add_response("WHERE `id` > '100'", _timestamp_rsp(2021, 5, 20)) @@ -157,3 +173,37 @@ def test_drop_nothing_to_do(self): database, partitions, current_position, current_timestamp, table ) self.assertNotIn("drop_query", results) + + +def test_get_droppable_partitions_no_exact_times(caplog): + database = MockDatabase() + resp = _timestamp_rsp(2021, 5, 20) + resp.extend(_timestamp_rsp(2021, 5, 21)) + database.add_response("WHERE `id` > '100'", resp) + database.add_response("WHERE `id` > '200'", _timestamp_rsp(2021, 5, 27)) + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + current_timestamp = datetime(2021, 7, 1, tzinfo=timezone.utc) + + partitions = [ + mkPPart("1", 100), + mkPPart("2", 200), + mkTailPart("z"), + ] + current_position = mkPos(340) + + table.set_retention_period(timedelta(days=2)) + + get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + assert ( + "Couldn't determine exact times for Table burgers.1: (100), it is probably droppable too." + in caplog.messages + ) diff --git a/partitionmanager/types_test.py b/partitionmanager/types_test.py index df9904d..ee7cd50 100644 --- a/partitionmanager/types_test.py +++ b/partitionmanager/types_test.py @@ -1,5 +1,6 @@ import argparse import unittest +import pytest from datetime import datetime, timedelta, timezone from .types import ( ChangePlannedPartition, @@ -184,6 +185,10 @@ def test_table(self): ) self.assertTrue(t.has_date_query) + def test_invalid_timedelta_string(self): + with pytest.raises(AttributeError): + assert timedelta_from_dict("30s") + def test_changed_partition(self): with self.assertRaises(ValueError): ChangePlannedPartition("bob") From 44799fb37ba07beef895343e737fab0dcadbb4a1 Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Wed, 28 Feb 2024 12:14:44 -0700 Subject: [PATCH 8/9] pytlint needs pytest --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f354ff0..38a2795 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,6 +20,7 @@ jobs: pip install --user pylint==2.17.7 pip install --user black~=22.3 pip install --user flake8~=4.0 + pip install --user pytest - name: Install Partition Manager run: | From f4d546e9dced4b4a9f045cee4f6443ee3d78e3ee Mon Sep 17 00:00:00 2001 From: "J.C. Jones" Date: Wed, 28 Feb 2024 12:24:29 -0700 Subject: [PATCH 9/9] Add an assertion for correct ordering of partitions --- partitionmanager/dropper.py | 3 +++ partitionmanager/dropper_test.py | 26 ++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/partitionmanager/dropper.py b/partitionmanager/dropper.py index 64d60aa..26557a4 100644 --- a/partitionmanager/dropper.py +++ b/partitionmanager/dropper.py @@ -39,6 +39,9 @@ def get_droppable_partitions( if not partitions: return results + if sorted(partitions) != partitions: + raise ValueError(f"Supplied partitions are not correctly sorted: {partitions}") + for partition, next_partition in partitionmanager.tools.pairwise(partitions): if next_partition >= current_position: log.debug( diff --git a/partitionmanager/dropper_test.py b/partitionmanager/dropper_test.py index 2e5a3a9..af3899e 100644 --- a/partitionmanager/dropper_test.py +++ b/partitionmanager/dropper_test.py @@ -139,6 +139,32 @@ def test_get_droppable_partitions(self): self.assertEqual(results["2"]["youngest_age"], "28 days, 0:00:00") self.assertEqual(results["2"]["approx_size"], 100) + def test_get_droppable_partitions_out_of_order(self): + database = MockDatabase() + + table = Table("burgers") + table.set_earliest_utc_timestamp_query( + SqlQuery( + "SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` " + "WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;" + ) + ) + current_timestamp = datetime(2021, 7, 1, tzinfo=timezone.utc) + + partitions = [ + mkPPart("2", 200), + mkPPart("1", 100), + mkPPart("3", 300), + mkTailPart("z"), + ] + current_position = mkPos(140) + table.set_retention_period(timedelta(days=2)) + + with self.assertRaises(ValueError): + get_droppable_partitions( + database, partitions, current_position, current_timestamp, table + ) + def test_drop_nothing_to_do(self): database = MockDatabase() database.add_response("WHERE `id` > '100'", _timestamp_rsp(2021, 5, 1))