Skip to content

Commit 6cf345d

Browse files
committed
Let bootstrap work on tables that aren't partitioned yet.
Add a --assume-partitioned-on flag to bootstrap, to facilitate operation on tables that do not yet have a partition map. One needs to supply `--assume-partitioned-on COLUMN_NAME` as many times as needed to identify all the columns which will be part of the partition expression. The 'bootstrap' command now emits table-copy instructions The original plan for 'bootstrap' was to do live alterations, that they should only lock what they needed, however InnoDB likes to lock everything. So instead we need to always assume that "bootstrapping" will be a live table clone, and at their conclusion the team should perform an atomic rename. Fix unit test to not depend on time of day (oops) Catch an arithmetic error Emit output lines with MySQL comment characters
1 parent e48ec1a commit 6cf345d

File tree

8 files changed

+632
-20
lines changed

8 files changed

+632
-20
lines changed

partitionmanager/bootstrap.py

Lines changed: 120 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,18 @@
1616
MINIMUM_FUTURE_DELTA = timedelta(hours=2)
1717

1818

19+
def _override_config_to_map_data(conf):
20+
"""Return an analog to get_partition_map from override data in conf"""
21+
return {
22+
"range_cols": [str(x) for x in conf.assume_partitioned_on],
23+
"partitions": [
24+
partitionmanager.types.MaxValuePartition(
25+
"p_assumed", count=len(conf.assume_partitioned_on)
26+
)
27+
],
28+
}
29+
30+
1931
def write_state_info(conf, out_fp):
2032
"""
2133
Write the state info for tables defined in conf to the provided file-like
@@ -26,11 +38,15 @@ def write_state_info(conf, out_fp):
2638
log.info("Writing current state information")
2739
state_info = {"time": conf.curtime, "tables": dict()}
2840
for table in conf.tables:
29-
problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
30-
if problems:
31-
raise Exception("; ".join(problems))
41+
map_data = None
42+
if not conf.assume_partitioned_on:
43+
problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
44+
if problems:
45+
raise Exception("; ".join(problems))
46+
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
47+
else:
48+
map_data = _override_config_to_map_data(conf)
3249

33-
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
3450
positions = pm_tap.get_current_positions(
3551
conf.dbcmd, table, map_data["range_cols"]
3652
)
@@ -73,7 +89,7 @@ def _plan_partitions_for_time_offsets(
7389
for (i, offset), is_final in partitionmanager.tools.iter_show_end(
7490
enumerate(time_offsets)
7591
):
76-
increase = [x * offset / RATE_UNIT for x in rate_of_change]
92+
increase = [x * (offset / RATE_UNIT) for x in rate_of_change]
7793
predicted_positions = [
7894
int(p + i) for p, i in zip(ordered_current_pos, increase)
7995
]
@@ -101,6 +117,83 @@ def _plan_partitions_for_time_offsets(
101117
return changes
102118

103119

120+
def _suffix(lines, *, indent="", mid_suffix="", final_suffix=""):
121+
""" Helper that suffixes each line with either mid- or final- suffix """
122+
for line, is_final in partitionmanager.tools.iter_show_end(lines):
123+
if is_final:
124+
yield indent + line + final_suffix
125+
else:
126+
yield indent + line + mid_suffix
127+
128+
129+
def _trigger_column_copies(cols):
130+
""" Helper that returns lines copying each column for a trigger. """
131+
for c in cols:
132+
yield f"`{c}` = NEW.`{c}`"
133+
134+
135+
def _generate_sql_copy_commands(
136+
existing_table, map_data, columns, new_table, alter_commands_iter
137+
):
138+
""" Generate a series of SQL commands to start a copy of the existing_table
139+
to a new_table, applying the supplied alterations before starting the
140+
triggers. """
141+
log = logging.getLogger(
142+
f"_generate_sql_copy_commands:{existing_table.name} to {new_table.name}"
143+
)
144+
145+
max_val_part = map_data["partitions"][-1]
146+
if not isinstance(max_val_part, partitionmanager.types.MaxValuePartition):
147+
log.error(f"Expected a MaxValue partition, got {max_val_part}")
148+
raise Exception("Unexpected part?")
149+
150+
range_id_string = ", ".join(map_data["range_cols"])
151+
152+
yield f"DROP TABLE IF EXISTS {new_table.name};"
153+
yield f"CREATE TABLE {new_table.name} LIKE {existing_table.name};"
154+
yield f"ALTER TABLE {new_table.name} REMOVE PARTITIONING;"
155+
yield f"ALTER TABLE {new_table.name} PARTITION BY RANGE({range_id_string}) ("
156+
yield f"\tPARTITION {max_val_part.name} VALUES LESS THAN MAXVALUE"
157+
yield ");"
158+
159+
for command in alter_commands_iter:
160+
yield command
161+
162+
cols = set(columns)
163+
164+
yield f"CREATE OR REPLACE TRIGGER copy_inserts_from_{existing_table.name}_to_{new_table.name}"
165+
yield f"\tAFTER INSERT ON {existing_table.name} FOR EACH ROW"
166+
yield f"\t\tINSERT INTO {new_table.name} SET"
167+
168+
for line in _suffix(
169+
_trigger_column_copies(sorted(cols)),
170+
indent="\t\t\t",
171+
mid_suffix=",",
172+
final_suffix=";",
173+
):
174+
yield line
175+
176+
update_columns = cols.difference(set(map_data["range_cols"]))
177+
if not update_columns:
178+
log.info("No columns to copy, so no UPDATE trigger being constructed.")
179+
return
180+
181+
yield f"CREATE OR REPLACE TRIGGER copy_updates_from_{existing_table.name}_to_{new_table.name}"
182+
yield f"\tAFTER UPDATE ON {existing_table.name} FOR EACH ROW"
183+
yield f"\t\tUPDATE {new_table.name} SET"
184+
185+
for line in _suffix(
186+
_trigger_column_copies(sorted(update_columns)), indent="\t\t\t", mid_suffix=","
187+
):
188+
yield line
189+
190+
yield "\t\tWHERE " + " AND ".join(
191+
_trigger_column_copies(map_data["range_cols"])
192+
) + ";"
193+
194+
return
195+
196+
104197
def calculate_sql_alters_from_state_info(conf, in_fp):
105198
"""
106199
Using the config and the input yaml file-like object, return the SQL
@@ -130,15 +223,22 @@ def calculate_sql_alters_from_state_info(conf, in_fp):
130223
log.info(f"Skipping {table_name} as it is not in the current config")
131224
continue
132225

133-
problem = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
134-
if problem:
135-
raise Exception(problem)
226+
map_data = None
227+
228+
if not conf.assume_partitioned_on:
229+
problem = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
230+
if problem:
231+
raise Exception(problem)
232+
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
233+
else:
234+
map_data = _override_config_to_map_data(conf)
136235

137-
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
138236
current_positions = pm_tap.get_current_positions(
139237
conf.dbcmd, table, map_data["range_cols"]
140238
)
141239

240+
columns = [r["Field"] for r in pm_tap.get_columns(conf.dbcmd, table)]
241+
142242
ordered_current_pos = [
143243
current_positions[name] for name in map_data["range_cols"]
144244
]
@@ -178,7 +278,17 @@ def calculate_sql_alters_from_state_info(conf, in_fp):
178278
max_val_part,
179279
)
180280

281+
table_new = partitionmanager.types.Table(
282+
f"{table.name}_new_{conf.curtime:%Y%m%d}"
283+
)
284+
285+
alter_commands_iter = pm_tap.generate_sql_reorganize_partition_commands(
286+
table_new, changes
287+
)
288+
181289
commands[table.name] = list(
182-
pm_tap.generate_sql_reorganize_partition_commands(table, changes)
290+
_generate_sql_copy_commands(
291+
table, map_data, columns, table_new, alter_commands_iter
292+
)
183293
)
184294
return commands

partitionmanager/bootstrap_test.py

Lines changed: 167 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,30 @@
44
from datetime import datetime, timedelta
55

66
from .bootstrap import (
7+
_generate_sql_copy_commands,
78
_get_time_offsets,
9+
_suffix,
10+
_trigger_column_copies,
11+
_override_config_to_map_data,
12+
_plan_partitions_for_time_offsets,
813
calculate_sql_alters_from_state_info,
914
write_state_info,
1015
)
1116
from .cli import Config
12-
from .types import DatabaseCommand, Table, SqlInput
17+
from .types import (
18+
DatabaseCommand,
19+
Table,
20+
SqlInput,
21+
MaxValuePartition,
22+
ChangePlannedPartition,
23+
NewPlannedPartition,
24+
)
1325

1426

1527
class MockDatabase(DatabaseCommand):
1628
def __init__(self):
17-
self.response = []
29+
self._response = list()
30+
self._select_response = [[{"id": 150}]]
1831
self.num_queries = 0
1932

2033
def run(self, cmd):
@@ -36,8 +49,9 @@ def run(self, cmd):
3649
]
3750

3851
if "SELECT" in cmd:
39-
return [{"id": 150}]
40-
return self.response
52+
return self._select_response.pop()
53+
54+
return self._response.pop()
4155

4256
def db_name(self):
4357
return SqlInput("the-database")
@@ -81,6 +95,7 @@ def test_get_time_offsets(self):
8195
)
8296

8397
def test_read_state_info(self):
98+
self.maxDiff = None
8499
conf_past = Config()
85100
conf_past.curtime = datetime(2021, 3, 1)
86101
conf_past.dbcmd = MockDatabase()
@@ -96,6 +111,12 @@ def test_read_state_info(self):
96111
conf_now = Config()
97112
conf_now.curtime = datetime(2021, 3, 3)
98113
conf_now.dbcmd = MockDatabase()
114+
conf_now.dbcmd._response = [
115+
[
116+
{"Field": "id", "Type": "bigint UNSIGNED"},
117+
{"Field": "serial", "Type": "varchar"},
118+
]
119+
]
99120
conf_now.tables = [Table("test").set_partition_period(timedelta(days=30))]
100121

101122
state_fs.seek(0)
@@ -104,10 +125,148 @@ def test_read_state_info(self):
104125
x,
105126
{
106127
"test": [
107-
"ALTER TABLE `test` REORGANIZE PARTITION `p_start` INTO "
108-
"(PARTITION `p_20210303` VALUES LESS THAN (156), "
109-
"PARTITION `p_20210402` VALUES LESS THAN (2406), "
110-
"PARTITION `p_20210502` VALUES LESS THAN MAXVALUE);"
128+
"DROP TABLE IF EXISTS test_new_20210303;",
129+
"CREATE TABLE test_new_20210303 LIKE test;",
130+
"ALTER TABLE test_new_20210303 REMOVE PARTITIONING;",
131+
"ALTER TABLE test_new_20210303 PARTITION BY RANGE(id) (",
132+
"\tPARTITION p_start VALUES LESS THAN MAXVALUE",
133+
");",
134+
"ALTER TABLE `test_new_20210303` REORGANIZE PARTITION `p_start` "
135+
+ "INTO (PARTITION `p_20210303` VALUES LESS THAN (156), "
136+
+ "PARTITION `p_20210402` VALUES LESS THAN (2406), PARTITION "
137+
+ "`p_20210502` VALUES LESS THAN MAXVALUE);",
138+
"CREATE OR REPLACE TRIGGER copy_inserts_from_test_to_test_new_20210303",
139+
"\tAFTER INSERT ON test FOR EACH ROW",
140+
"\t\tINSERT INTO test_new_20210303 SET",
141+
"\t\t\t`id` = NEW.`id`,",
142+
"\t\t\t`serial` = NEW.`serial`;",
143+
"CREATE OR REPLACE TRIGGER copy_updates_from_test_to_test_new_20210303",
144+
"\tAFTER UPDATE ON test FOR EACH ROW",
145+
"\t\tUPDATE test_new_20210303 SET",
146+
"\t\t\t`serial` = NEW.`serial`",
147+
"\t\tWHERE `id` = NEW.`id`;",
111148
]
112149
},
113150
)
151+
152+
def test_read_state_info_map_table(self):
153+
self.maxDiff = None
154+
conf = Config()
155+
conf.assume_partitioned_on = ["order", "auth"]
156+
conf.curtime = datetime(2021, 3, 3)
157+
conf.dbcmd = MockDatabase()
158+
conf.dbcmd._select_response = [[{"auth": 22}], [{"order": 11}]]
159+
conf.dbcmd._response = [
160+
[
161+
{"Field": "order", "Type": "bigint UNSIGNED"},
162+
{"Field": "auth", "Type": "bigint UNSIGNED"},
163+
]
164+
]
165+
conf.tables = [Table("map_table").set_partition_period(timedelta(days=30))]
166+
167+
state_fs = io.StringIO()
168+
yaml.dump(
169+
{
170+
"tables": {"map_table": {"order": 11, "auth": 22}},
171+
"time": (conf.curtime - timedelta(days=1)),
172+
},
173+
state_fs,
174+
)
175+
state_fs.seek(0)
176+
177+
x = calculate_sql_alters_from_state_info(conf, state_fs)
178+
print(x)
179+
self.assertEqual(
180+
x,
181+
{
182+
"map_table": [
183+
"DROP TABLE IF EXISTS map_table_new_20210303;",
184+
"CREATE TABLE map_table_new_20210303 LIKE map_table;",
185+
"ALTER TABLE map_table_new_20210303 REMOVE PARTITIONING;",
186+
"ALTER TABLE map_table_new_20210303 PARTITION BY RANGE(order, auth) (",
187+
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
188+
");",
189+
"ALTER TABLE `map_table_new_20210303` REORGANIZE PARTITION "
190+
+ "`p_assumed` INTO (PARTITION `p_20210303` VALUES LESS THAN "
191+
+ "(11, 22), PARTITION `p_20210402` VALUES LESS THAN "
192+
+ "(11, 22), PARTITION `p_20210502` VALUES LESS THAN "
193+
+ "MAXVALUE, MAXVALUE);",
194+
"CREATE OR REPLACE TRIGGER copy_inserts_from_map_table_"
195+
+ "to_map_table_new_20210303",
196+
"\tAFTER INSERT ON map_table FOR EACH ROW",
197+
"\t\tINSERT INTO map_table_new_20210303 SET",
198+
"\t\t\t`auth` = NEW.`auth`,",
199+
"\t\t\t`order` = NEW.`order`;",
200+
]
201+
},
202+
)
203+
204+
def test_trigger_column_copies(self):
205+
self.assertEqual(list(_trigger_column_copies([])), [])
206+
self.assertEqual(list(_trigger_column_copies(["a"])), ["`a` = NEW.`a`"])
207+
self.assertEqual(
208+
list(_trigger_column_copies(["b", "a", "c"])),
209+
["`b` = NEW.`b`", "`a` = NEW.`a`", "`c` = NEW.`c`"],
210+
)
211+
212+
def test_suffix(self):
213+
self.assertEqual(list(_suffix(["a"])), ["a"])
214+
self.assertEqual(list(_suffix(["a", "b"])), ["a", "b"])
215+
self.assertEqual(list(_suffix(["a", "b"], indent=" ")), [" a", " b"])
216+
self.assertEqual(list(_suffix(["a", "b"], mid_suffix=",")), ["a,", "b"])
217+
self.assertEqual(list(_suffix(["a", "b"], final_suffix=";")), ["a", "b;"])
218+
self.assertEqual(
219+
list(_suffix(["a", "b"], mid_suffix=",", final_suffix=";")), ["a,", "b;"]
220+
)
221+
222+
def test_generate_sql_copy_commands(self):
223+
conf = Config()
224+
conf.assume_partitioned_on = ["id"]
225+
conf.curtime = datetime(2021, 3, 3)
226+
conf.dbcmd = MockDatabase()
227+
map_data = _override_config_to_map_data(conf)
228+
cmds = list(
229+
_generate_sql_copy_commands(
230+
Table("old"),
231+
map_data,
232+
["id", "field"],
233+
Table("new"),
234+
["STRAIGHT_UP_INSERTED", "STUFF GOES HERE"],
235+
)
236+
)
237+
238+
print(cmds)
239+
self.assertEqual(
240+
cmds,
241+
[
242+
"DROP TABLE IF EXISTS new;",
243+
"CREATE TABLE new LIKE old;",
244+
"ALTER TABLE new REMOVE PARTITIONING;",
245+
"ALTER TABLE new PARTITION BY RANGE(id) (",
246+
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
247+
");",
248+
"STRAIGHT_UP_INSERTED",
249+
"STUFF GOES HERE",
250+
"CREATE OR REPLACE TRIGGER copy_inserts_from_old_to_new",
251+
"\tAFTER INSERT ON old FOR EACH ROW",
252+
"\t\tINSERT INTO new SET",
253+
"\t\t\t`field` = NEW.`field`,",
254+
"\t\t\t`id` = NEW.`id`;",
255+
"CREATE OR REPLACE TRIGGER copy_updates_from_old_to_new",
256+
"\tAFTER UPDATE ON old FOR EACH ROW",
257+
"\t\tUPDATE new SET",
258+
"\t\t\t`field` = NEW.`field`",
259+
"\t\tWHERE `id` = NEW.`id`;",
260+
],
261+
)
262+
263+
def test_plan_partitions_for_time_offsets(self):
264+
parts = _plan_partitions_for_time_offsets(
265+
datetime(2021, 3, 3),
266+
[timedelta(days=60), timedelta(days=360)],
267+
[11943234],
268+
[16753227640],
269+
MaxValuePartition("p_assumed", count=1),
270+
)
271+
self.assertIsInstance(parts[0], ChangePlannedPartition)
272+
self.assertIsInstance(parts[1], NewPlannedPartition)

0 commit comments

Comments
 (0)