Skip to content

Commit 03a9cc8

Browse files
authored
Predictive partitioning (#3)
* First pass algorithm * Add ability to compare partition positions * Add a split method for dividing partition lists * More tests * Add a position rate function * Add methods to determine a weighted rate of increase * Add docs to the new table_append_partition methods * Use the Partition timestamp() method * plan_partition_changes algorithm * More partition planning tests * Predictive partitiong algorithm functioning in tests * Rework the CLI to use the new partition planning algorithm * Passing integration tests * Handle short and bespoke partition names. * Improve logging * Remove spurious strip * Moving to 0.2.0 * Logging cleanups * Fix a host of pylint issues $ pylint --ignore-patterns=.*_test.py partitionmanager/ --disable W1203 --disable invalid-name --disable bad-continuation ************* Module partitionmanager.tools partitionmanager/tools.py:22:11: R1708: Do not raise StopIteration in generator, use return statement instead (stop-iteration-return) ************* Module partitionmanager.sql partitionmanager/sql.py:36:0: R0903: Too few public methods (1/2) (too-few-public-methods) ************* Module partitionmanager.stats partitionmanager/stats.py:12:0: R0903: Too few public methods (0/2) (too-few-public-methods) partitionmanager/stats.py:65:0: R0912: Too many branches (14/12) (too-many-branches) ************* Module partitionmanager.table_append_partition partitionmanager/table_append_partition.py:98:0: R0914: Too many local variables (16/15) (too-many-locals) partitionmanager/table_append_partition.py:306:0: R0914: Too many local variables (23/15) (too-many-locals) ------------------------------------------------------------------ Your code has been rated at 9.92/10 (previous run: 9.91/10, +0.01) * Better logging on partition * Never adjust the active_partition MariaDB has a limitation on editing the active partition, particularly: `ERROR 1520 (HY000): Reorganize of range partitions cannot change total ranges except for last partition where it can extend the range` so we can't edit the active partition, either. * Never edit positions on empty partitions Like the previous commit, MariaDB has a limitation on editing any partition's offset: `ERROR 1520 (HY000): Reorganize of range partitions cannot change total ranges except for last partition where it can extend the range` So the positions field should never be edited for existing partitions, only their names. * Consolidate logic to use partition names as start-of-fill dates * stderr is not so useful from the Subprocess Database Command, let's dump it * Bugfix: get_current_positions needs to query the latest of each column Before, get_current_positions returned each column for the entry with the largest ID from the first column, while for partitioning purposes we actually want to always be strictly increasing. This does make such tables less space-efficient, but that's a matter for partition design. * Add "bootstrap" methods to prepare partitioned tables Tables whose partitions don't contain datestamps of the p_YYYYMMDD form don't provide partman enough info to derive rates of change, so these bootstrap routines will save a YAML file somewhere with point-in-time data that can be reloaded to derive a rate-of-change. This is only intended to be used for the initial partitioning of a table, or when a table has no empty partitions. In a subsequent commit I'll tie this into cli.py, ensuring to add alerts that these ALTERs cannot be expected to complete quickly, that likely the database will hold locks for substantial amounts of time for each of the ALTER commands, and the tool will simply be printing potential ALTER commands to console for an operator to analyze and run in the manner they find best. * Wire up Bootstrap to the CLI * Rework CLI to print yaml-like but stringified output
1 parent 52a41a3 commit 03a9cc8

16 files changed

+2355
-434
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ partitionmanager:
3434
# mariadb: /usr/local/bin/mariadb
3535
partition_period:
3636
days: 7
37+
num_empty: 2
3738

3839
tables:
3940
table1:
@@ -48,6 +49,46 @@ partitionmanager:
4849
```
4950
5051
52+
# Algorithm
53+
54+
For a given table and that table's intended partition period, desired end-state is to have:
55+
- All the existing partitions containing data,
56+
- A configurable number of trailing partitions which contain no data, and
57+
- An "active" partition currently being filled with data
58+
59+
To make it easier to manage, we give all the filled partitions a name to indicate the approximate date that partition began being filled with data. This date is approximate because once a partition contains data, it is no longer an instant `ALTER` operation to rename the partition, rather every contained row gets copied, so this tool predicts the date at which the new partition will become the "active" one.
60+
61+
Inputs:
62+
- The table name
63+
- The intended partition period
64+
- The number of trailing partitions to keep
65+
- The table's current partition list
66+
- The table's partition id's current value(s)
67+
68+
Outputs:
69+
- An intended partition list, changing only the empty partitions, or
70+
- If no partitions can be reorganized, an error.
71+
72+
Procedure:
73+
- Using the current values, split the partition list into two sub-lists: empty partitions, and non-empty partitions.
74+
- If there are no empty partitions:
75+
- Raise an error and halt the algorithm.
76+
77+
- Perform a statistical regression using each non-empty partition to determine each partition's fill rate.
78+
- Using each partition's fill rate and their age, predict the future partition fill rate.
79+
- Create a new list of intended empty partitions.
80+
- For each empty partition:
81+
- Predict the start-of-fill date using the partition's position relative to the current active partition, the current active partition's date, the partition period, and the future partition fill rate.
82+
- Predict the end-of-fill value using the start-of-fill date and the future partition fill rate.
83+
- If the start-of-fill date is different than the partition's name, rename the partition.
84+
- If the end-of-fill value is different than the partition's current value, change that value.
85+
- Append the changed partition to the intended empty partition list.
86+
- While the number of empty partitions is less than the intended number of trailing partitions to keep:
87+
- Predict the start-of-fill date for a new partition using the previous partition's date and the partition period.
88+
- Predict the end-of-fill value using the start-of-fill date and the future partition fill rate.
89+
- Append the new partition to the intended empty partition list.
90+
- Return the lists of non-empty partitions, the current empty partitions, and the post-algorithm intended empty partitions.
91+
5192
# TODOs
5293

5394
Lots. A drop mechanism, for one. Yet more tests, particularly live integration tests with a test DB, for another.

partitionmanager/bootstrap.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
"""
2+
Bootstrap a table that does not have sufficient partitions to determine rates
3+
of change.
4+
"""
5+
6+
from datetime import timedelta
7+
import logging
8+
import operator
9+
import yaml
10+
11+
from partitionmanager.types import (
12+
ChangePlannedPartition,
13+
MaxValuePartition,
14+
NewPlannedPartition,
15+
)
16+
from partitionmanager.table_append_partition import (
17+
table_is_compatible,
18+
get_current_positions,
19+
get_partition_map,
20+
generate_sql_reorganize_partition_commands,
21+
)
22+
from .tools import iter_show_end
23+
24+
RATE_UNIT = timedelta(hours=1)
25+
MINIMUM_FUTURE_DELTA = timedelta(hours=2)
26+
27+
28+
def write_state_info(conf, out_fp):
29+
"""
30+
Write the state info for tables defined in conf to the provided file-like
31+
object.
32+
"""
33+
log = logging.getLogger("write_state_info")
34+
35+
log.info("Writing current state information")
36+
state_info = {"time": conf.curtime, "tables": dict()}
37+
for table in conf.tables:
38+
problem = table_is_compatible(conf.dbcmd, table)
39+
if problem:
40+
raise Exception(problem)
41+
42+
map_data = get_partition_map(conf.dbcmd, table)
43+
positions = get_current_positions(conf.dbcmd, table, map_data["range_cols"])
44+
45+
log.info(f'(Table("{table.name}"): {positions}),')
46+
state_info["tables"][str(table.name)] = positions
47+
48+
yaml.dump(state_info, out_fp)
49+
50+
51+
def _get_time_offsets(num_entries, first_delta, subseq_delta):
52+
"""
53+
Construct a list of timedeltas of size num_entries of the form
54+
[ first_delta, subseq_delta, [subseq_delta...] ]
55+
"""
56+
if num_entries < 1:
57+
raise ValueError("Must request at least one entry")
58+
59+
time_units = [first_delta]
60+
while len(time_units) < num_entries:
61+
prev = time_units[-1]
62+
time_units.append(prev + subseq_delta)
63+
64+
return time_units
65+
66+
67+
def _plan_partitions_for_time_offsets(
68+
now_time, time_offsets, rate_of_change, ordered_current_pos, max_val_part
69+
):
70+
"""
71+
Return a list of PlannedPartitions, starting from now, corresponding to
72+
each supplied offset that will represent the positions then from the
73+
supplied current positions and the rate of change. The first planned
74+
partition will be altered out of the supplied MaxValue partition.
75+
"""
76+
changes = list()
77+
for (i, offset), is_final in iter_show_end(enumerate(time_offsets)):
78+
increase = [x * offset / RATE_UNIT for x in rate_of_change]
79+
predicted_positions = [
80+
int(p + i) for p, i in zip(ordered_current_pos, increase)
81+
]
82+
predicted_time = now_time + offset
83+
84+
part = None
85+
if i == 0:
86+
part = (
87+
ChangePlannedPartition(max_val_part)
88+
.set_position(predicted_positions)
89+
.set_timestamp(predicted_time)
90+
)
91+
92+
else:
93+
part = NewPlannedPartition().set_timestamp(predicted_time)
94+
95+
if is_final:
96+
part.set_columns(len(predicted_positions))
97+
else:
98+
part.set_position(predicted_positions)
99+
100+
changes.append(part)
101+
return changes
102+
103+
104+
def calculate_sql_alters_from_state_info(conf, in_fp):
105+
"""
106+
Using the config and the input yaml file-like object, return the SQL
107+
statements to bootstrap the tables in config that also have data in
108+
the input yaml as a dictionary of { Table -> list(SQL ALTER statements) }
109+
"""
110+
log = logging.getLogger("calculate_sql_alters")
111+
112+
log.info("Reading prior state information")
113+
prior_data = yaml.safe_load(in_fp)
114+
115+
time_delta = (conf.curtime - prior_data["time"]) / RATE_UNIT
116+
if time_delta <= 0:
117+
raise ValueError(
118+
f"Time delta is too small: {conf.curtime} - "
119+
f"{prior_data['time']} = {time_delta}"
120+
)
121+
122+
commands = dict()
123+
124+
for table_name, prior_pos in prior_data["tables"].items():
125+
table = None
126+
for t in conf.tables:
127+
if t.name == table_name:
128+
table = t
129+
if not table:
130+
log.info(f"Skipping {table_name} as it is not in the current config")
131+
continue
132+
133+
problem = table_is_compatible(conf.dbcmd, table)
134+
if problem:
135+
raise Exception(problem)
136+
137+
map_data = get_partition_map(conf.dbcmd, table)
138+
current_positions = get_current_positions(
139+
conf.dbcmd, table, map_data["range_cols"]
140+
)
141+
142+
ordered_current_pos = [
143+
current_positions[name] for name in map_data["range_cols"]
144+
]
145+
ordered_prior_pos = [prior_pos[name] for name in map_data["range_cols"]]
146+
147+
delta_positions = list(
148+
map(operator.sub, ordered_current_pos, ordered_prior_pos)
149+
)
150+
rate_of_change = list(map(lambda pos: pos / time_delta, delta_positions))
151+
152+
max_val_part = map_data["partitions"][-1]
153+
if not isinstance(max_val_part, MaxValuePartition):
154+
log.error(f"Expected a MaxValue partition, got {max_val_part}")
155+
raise Exception("Unexpected part?")
156+
157+
log.info(
158+
f"{table}, {time_delta:0.1f} hours, {ordered_prior_pos} - {ordered_current_pos}, "
159+
f"{delta_positions} pos_change, {rate_of_change}/hour"
160+
)
161+
162+
part_duration = conf.partition_period
163+
if table.partition_period:
164+
part_duration = table.partition_period
165+
166+
time_offsets = _get_time_offsets(
167+
1 + conf.num_empty, MINIMUM_FUTURE_DELTA, part_duration
168+
)
169+
170+
changes = _plan_partitions_for_time_offsets(
171+
conf.curtime,
172+
time_offsets,
173+
rate_of_change,
174+
ordered_current_pos,
175+
max_val_part,
176+
)
177+
178+
commands[table.name] = list(
179+
generate_sql_reorganize_partition_commands(table, changes)
180+
)
181+
182+
return commands

partitionmanager/bootstrap_test.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import io
2+
import unittest
3+
import yaml
4+
from datetime import datetime, timedelta
5+
6+
from .bootstrap import (
7+
_get_time_offsets,
8+
calculate_sql_alters_from_state_info,
9+
write_state_info,
10+
)
11+
from .cli import Config
12+
from .types import DatabaseCommand, Table, SqlInput
13+
14+
15+
class MockDatabase(DatabaseCommand):
16+
def __init__(self):
17+
self.response = []
18+
self.num_queries = 0
19+
20+
def run(self, cmd):
21+
self.num_queries += 1
22+
23+
if "CREATE_OPTIONS" in cmd:
24+
return [{"CREATE_OPTIONS": "partitioned"}]
25+
26+
if "SHOW CREATE TABLE" in cmd:
27+
return [
28+
{
29+
"Create Table": """CREATE TABLE `burgers` (
30+
`id` bigint(20) NOT NULL AUTO_INCREMENT,
31+
PRIMARY KEY (`id`),
32+
) ENGINE=InnoDB AUTO_INCREMENT=150 DEFAULT CHARSET=utf8
33+
PARTITION BY RANGE (`id`)
34+
(PARTITION `p_start` VALUES LESS THAN MAXVALUE ENGINE = InnoDB)"""
35+
}
36+
]
37+
38+
if "SELECT" in cmd:
39+
return [{"id": 150}]
40+
41+
return self.response
42+
43+
def db_name(self):
44+
return SqlInput("the-database")
45+
46+
47+
class TestBootstrapTool(unittest.TestCase):
48+
def test_writing_state_info(self):
49+
conf = Config()
50+
conf.curtime = datetime(2021, 3, 1)
51+
conf.dbcmd = MockDatabase()
52+
conf.tables = [Table("test")]
53+
54+
out = io.StringIO()
55+
56+
write_state_info(conf, out)
57+
58+
written_yaml = yaml.safe_load(out.getvalue())
59+
60+
self.assertEqual(
61+
written_yaml, {"tables": {"test": {"id": 150}}, "time": conf.curtime}
62+
)
63+
64+
def test_get_time_offsets(self):
65+
self.assertEqual(
66+
_get_time_offsets(1, timedelta(hours=4), timedelta(days=30)),
67+
[timedelta(hours=4)],
68+
)
69+
70+
self.assertEqual(
71+
_get_time_offsets(2, timedelta(hours=4), timedelta(days=30)),
72+
[timedelta(hours=4), timedelta(days=30, hours=4)],
73+
)
74+
75+
self.assertEqual(
76+
_get_time_offsets(3, timedelta(hours=4), timedelta(days=30)),
77+
[
78+
timedelta(hours=4),
79+
timedelta(days=30, hours=4),
80+
timedelta(days=60, hours=4),
81+
],
82+
)
83+
84+
def test_read_state_info(self):
85+
conf_past = Config()
86+
conf_past.curtime = datetime(2021, 3, 1)
87+
conf_past.dbcmd = MockDatabase()
88+
conf_past.tables = [Table("test").set_partition_period(timedelta(days=30))]
89+
90+
state_fs = io.StringIO()
91+
yaml.dump({"tables": {"test": {"id": 0}}, "time": conf_past.curtime}, state_fs)
92+
state_fs.seek(0)
93+
94+
with self.assertRaises(ValueError):
95+
calculate_sql_alters_from_state_info(conf_past, state_fs)
96+
97+
conf_now = Config()
98+
conf_now.curtime = datetime(2021, 3, 3)
99+
conf_now.dbcmd = MockDatabase()
100+
conf_now.tables = [Table("test").set_partition_period(timedelta(days=30))]
101+
102+
state_fs.seek(0)
103+
x = calculate_sql_alters_from_state_info(conf_now, state_fs)
104+
self.assertEqual(
105+
x,
106+
{
107+
"test": [
108+
"ALTER TABLE `test` REORGANIZE PARTITION `p_start` INTO "
109+
"(PARTITION `p_20210303` VALUES LESS THAN (156), "
110+
"PARTITION `p_20210402` VALUES LESS THAN (2406), "
111+
"PARTITION `p_20210502` VALUES LESS THAN MAXVALUE);"
112+
]
113+
},
114+
)

0 commit comments

Comments
 (0)