Skip to content

Retention-based Partition Dropping #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ jobs:
- name: Install Linting Tools
run: |
python -m pip install --upgrade pip
pip install --user pylint==2.6.0
pip install --user pylint==2.17.7
pip install --user black~=22.3
pip install --user flake8~=4.0
pip install --user pytest

- name: Install Partition Manager
run: |
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repos:
hooks:
- id: flake8
- repo: https://github.com/PyCQA/pylint
rev: pylint-2.6.0
rev: v2.17.7
hooks:
- id: pylint
args:
Expand Down
69 changes: 58 additions & 11 deletions partitionmanager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import traceback
import yaml

import partitionmanager.database_helpers
import partitionmanager.dropper
import partitionmanager.migrate
import partitionmanager.sql
import partitionmanager.stats
Expand Down Expand Up @@ -121,10 +123,10 @@ def from_yaml_file(self, file):
for key in data["tables"]:
tab = partitionmanager.types.Table(key)
tabledata = data["tables"][key]
if isinstance(tabledata, dict) and "retention" in tabledata:
tab.set_retention(
if isinstance(tabledata, dict) and "retention_period" in tabledata:
tab.set_retention_period(
partitionmanager.types.timedelta_from_dict(
tabledata["retention"]
tabledata["retention_period"]
)
)
if isinstance(tabledata, dict) and "partition_period" in tabledata:
Expand Down Expand Up @@ -318,16 +320,10 @@ def do_partition(conf):
duration = table.partition_period

log.info(f"Evaluating {table} (duration={duration})")

positions = pm_tap.get_current_positions(
conf.dbcmd, table, map_data["range_cols"]
cur_pos = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

log.info(f"{table} (pos={positions})")

cur_pos = partitionmanager.types.Position()
cur_pos.set_position([positions[col] for col in map_data["range_cols"]])

sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
database=conf.dbcmd,
table=table,
Expand Down Expand Up @@ -465,6 +461,57 @@ def do_stats(conf, metrics=partitionmanager.stats.PrometheusMetrics()):
return all_results


def drop_cmd(args):
"""Calculates drop.
Helper for argparse.
"""
conf = config_from_args(args)
return do_find_drops_for_tables(conf)


DROP_PARSER = SUBPARSERS.add_parser("drop", help="drop old partitions")
DROP_PARSER.set_defaults(func=drop_cmd)


def do_find_drops_for_tables(conf):
all_results = dict()
for table in conf.tables:
log = logging.getLogger(f"do_find_drops_for_tables:{table.name}")

if not table.has_date_query:
log.warning(f"Cannot process {table}: no date query specified")
continue

if not table.retention_period:
log.warning(f"Cannot process {table}: no retention specified")
continue

try:
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
if table_problems:
log.debug(f"Cannot process {table}: {table_problems}")
continue

map_data = pm_tap.get_partition_map(conf.dbcmd, table)
current_position = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

droppable = partitionmanager.dropper.get_droppable_partitions(
conf.dbcmd,
map_data["partitions"],
current_position,
conf.curtime,
table,
)

all_results[table.name] = droppable
except Exception as e:
log.warning(f"Error processing table {table.name}")
raise e
return all_results


def main():
"""Start here."""
args = PARSER.parse_args()
Expand Down
60 changes: 58 additions & 2 deletions partitionmanager/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
migrate_cmd,
config_from_args,
do_partition,
drop_cmd,
PARSER,
partition_cmd,
stats_cmd,
Expand Down Expand Up @@ -224,11 +225,9 @@ def test_partition_period_seven_days(self):
[
"INFO:partition:Evaluating Table partitioned_last_week "
"(duration=7 days, 0:00:00)",
"INFO:partition:Table partitioned_last_week (pos={'id': 150})",
"DEBUG:partition:Table partitioned_last_week has no pending SQL updates.",
"INFO:partition:Evaluating Table partitioned_yesterday "
"(duration=7 days, 0:00:00)",
"INFO:partition:Table partitioned_yesterday (pos={'id': 150})",
"DEBUG:partition:Table partitioned_yesterday has no pending SQL updates.",
]
),
Expand Down Expand Up @@ -626,3 +625,60 @@ def test_migrate_cmd_in_out(self):
"flip",
]
)


class TestDropCmd(unittest.TestCase):
def _run_drop_cmd_yaml(self, yaml):
with tempfile.NamedTemporaryFile() as tmpfile:
insert_into_file(tmpfile, yaml)
args = PARSER.parse_args(["--config", tmpfile.name, "drop"])
return drop_cmd(args)

def test_drop_invalid_config(self):
with self.assertLogs(
"do_find_drops_for_tables:unused", level="WARNING"
) as logctx:
self._run_drop_cmd_yaml(
f"""
partitionmanager:
mariadb: {str(fake_exec)}
tables:
unused:
earliest_utc_timestamp_query: >
SELECT UNIX_TIMESTAMP(`issued`) FROM `unused`
WHERE `id` > '?' ORDER BY `id` ASC LIMIT 1;
"""
)
self.assertEqual(
set(logctx.output),
set(
[
"WARNING:do_find_drops_for_tables:unused:"
"Cannot process Table unused: no retention specified"
]
),
)

def test_drop_no_sql(self):
with self.assertLogs(
"do_find_drops_for_tables:unused", level="WARNING"
) as logctx:
self._run_drop_cmd_yaml(
f"""
partitionmanager:
mariadb: {str(fake_exec)}
tables:
unused:
retention_period:
days: 180
"""
)
self.assertEqual(
set(logctx.output),
set(
[
"WARNING:do_find_drops_for_tables:unused:"
"Cannot process Table unused: no date query specified"
]
),
)
72 changes: 72 additions & 0 deletions partitionmanager/database_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Helper functions for database operations
"""

from datetime import datetime, timezone
import logging

import partitionmanager.table_append_partition as pm_tap
import partitionmanager.types


def get_position_of_table(database, table, map_data):
"""Returns a Position of the table at the current moment."""

pos_list = pm_tap.get_current_positions(database, table, map_data["range_cols"])

cur_pos = partitionmanager.types.Position()
cur_pos.set_position([pos_list[col] for col in map_data["range_cols"]])

return cur_pos


def calculate_exact_timestamp_via_query(database, table, position_partition):
"""Calculates the exact timestamp of a PositionPartition.

raises ValueError if the position is incalculable
"""

log = logging.getLogger(f"calculate_exact_timestamp_via_query:{table.name}")

if not table.has_date_query:
raise ValueError("Table has no defined date query")

if not isinstance(position_partition, partitionmanager.types.PositionPartition):
raise ValueError("Only PositionPartitions are supported")

if len(position_partition.position) != 1:
raise ValueError(
"This method is only valid for single-column partitions right now"
)
arg = position_partition.position.as_sql_input()[0]

sql_select_cmd = table.earliest_utc_timestamp_query.get_statement_with_argument(arg)
log.debug(
"Executing %s to derive partition %s at position %s",
sql_select_cmd,
position_partition.name,
position_partition.position,
)

start = datetime.now()
exact_time_result = database.run(sql_select_cmd)
end = datetime.now()

if not len(exact_time_result) == 1:
raise partitionmanager.types.NoExactTimeException("No exact timestamp result")
if not len(exact_time_result[0]) == 1:
raise partitionmanager.types.NoExactTimeException(
"Unexpected column count for the timestamp result"
)
for key, value in exact_time_result[0].items():
exact_time = datetime.fromtimestamp(value, tz=timezone.utc)
break

log.debug(
"Exact time of %s returned for %s at position %s, query took %s",
exact_time,
position_partition.name,
position_partition.position,
(end - start),
)
return exact_time
109 changes: 109 additions & 0 deletions partitionmanager/database_helpers_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import unittest

from .database_helpers import get_position_of_table, calculate_exact_timestamp_via_query

from .types import (
DatabaseCommand,
NoExactTimeException,
PositionPartition,
SqlInput,
SqlQuery,
Table,
)


class MockDatabase(DatabaseCommand):
def __init__(self):
self._responses = list()
self.num_queries = 0

def add_response(self, expected, response):
self._responses.insert(0, {"expected": expected, "response": response})

def run(self, cmd):
self.num_queries += 1
if not self._responses:
raise Exception(f"No mock responses available for cmd [{cmd}]")

r = self._responses.pop()
if r["expected"] in cmd:
return r["response"]

raise Exception(f"Received command [{cmd}] and expected [{r['expected']}]")

def db_name(self):
return SqlInput("the-database")


class TestDatabaseHelpers(unittest.TestCase):
def test_position_of_table(self):
db = MockDatabase()
db.add_response("SELECT id FROM `burgers` ORDER BY", [{"id": 90210}])

table = Table("burgers")
data = {"range_cols": ["id"]}

pos = get_position_of_table(db, table, data)
self.assertEqual(pos.as_list(), [90210])

def test_exact_timestamp_no_query(self):
db = MockDatabase()
db.add_response("SELECT id FROM `burgers` ORDER BY", [{"id": 42}])

table = Table("burgers")
self.assertFalse(table.has_date_query)

pos = PositionPartition("p_start")
pos.set_position([42])

with self.assertRaises(ValueError):
calculate_exact_timestamp_via_query(db, table, pos)

def test_exact_timestamp(self):
db = MockDatabase()
db.add_response(
"SELECT UNIX_TIMESTAMP(`cooked`)", [{"UNIX_TIMESTAMP": 17541339060}]
)

table = Table("burgers")
table.set_earliest_utc_timestamp_query(
SqlQuery(
"SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` "
"WHERE `type` = \"burger\" AND `id` > '?' ORDER BY `id` ASC LIMIT 1;"
)
)

pos = PositionPartition("p_start")
pos.set_position([150])

ts = calculate_exact_timestamp_via_query(db, table, pos)
assert f"{ts}" == "2525-11-11 18:11:00+00:00"

def test_no_exact_timestamp(self):
db = MockDatabase()
db.add_response(
"SELECT UNIX_TIMESTAMP(`cooked`)",
[{"UNIX_TIMESTAMP": 17541339060}, {"UNIX_TIMESTAMP": 17541339070}],
)

table = Table("burgers")
table.set_earliest_utc_timestamp_query(
SqlQuery(
"SELECT UNIX_TIMESTAMP(`cooked`) FROM `orders` "
"WHERE `type` = \"burger\" AND `id` > '?' ORDER BY `id` ASC LIMIT 1;"
)
)

pos = PositionPartition("p_start")
pos.set_position([150])

with self.assertRaises(NoExactTimeException):
calculate_exact_timestamp_via_query(db, table, pos)

db.add_response(
"SELECT UNIX_TIMESTAMP(`cooked`)",
[{"UNIX_TIMESTAMP": 17541339060, "column2": True}],
)

with self.assertRaises(NoExactTimeException):
calculate_exact_timestamp_via_query(db, table, pos)
Loading