Skip to content

Commit 2448fce

Browse files
committed
Let bootstrap work on tables that aren't partitioned yet.
Add a --assume-partitioned-on flag to bootstrap, to facilitate operation on tables that do not yet have a partition map. One needs to supply `--assume-partitioned-on COLUMN_NAME` as many times as needed to identify all the columns which will be part of the partition expression. The 'bootstrap' command now emits table-copy instructions The original plan for 'bootstrap' was to do live alterations, that they should only lock what they needed, however InnoDB likes to lock everything. So instead we need to always assume that "bootstrapping" will be a live table clone, and at their conclusion the team should perform an atomic rename.
1 parent 43922b5 commit 2448fce

File tree

8 files changed

+608
-17
lines changed

8 files changed

+608
-17
lines changed

partitionmanager/bootstrap.py

Lines changed: 119 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,18 @@
1616
MINIMUM_FUTURE_DELTA = timedelta(hours=2)
1717

1818

19+
def _override_config_to_map_data(conf):
20+
"""Return an analog to get_partition_map from override data in conf"""
21+
return {
22+
"range_cols": [str(x) for x in conf.assume_partitioned_on],
23+
"partitions": [
24+
partitionmanager.types.MaxValuePartition(
25+
"p_assumed", count=len(conf.assume_partitioned_on)
26+
)
27+
],
28+
}
29+
30+
1931
def write_state_info(conf, out_fp):
2032
"""
2133
Write the state info for tables defined in conf to the provided file-like
@@ -26,11 +38,15 @@ def write_state_info(conf, out_fp):
2638
log.info("Writing current state information")
2739
state_info = {"time": conf.curtime, "tables": dict()}
2840
for table in conf.tables:
29-
problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
30-
if problems:
31-
raise Exception("; ".join(problems))
41+
map_data = None
42+
if not conf.assume_partitioned_on:
43+
problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
44+
if problems:
45+
raise Exception("; ".join(problems))
46+
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
47+
else:
48+
map_data = _override_config_to_map_data(conf)
3249

33-
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
3450
positions = pm_tap.get_current_positions(
3551
conf.dbcmd, table, map_data["range_cols"]
3652
)
@@ -98,6 +114,83 @@ def _plan_partitions_for_time_offsets(
98114
return changes
99115

100116

117+
def _suffix(lines, *, indent="", mid_suffix="", final_suffix=""):
118+
""" Helper that suffixes each line with either mid- or final- suffix """
119+
for line, is_final in partitionmanager.tools.iter_show_end(lines):
120+
if is_final:
121+
yield indent + line + final_suffix
122+
else:
123+
yield indent + line + mid_suffix
124+
125+
126+
def _trigger_column_copies(cols):
127+
""" Helper that returns lines copying each column for a trigger. """
128+
for c in cols:
129+
yield f"`{c}` = NEW.`{c}`"
130+
131+
132+
def _generate_sql_copy_commands(
133+
existing_table, map_data, columns, new_table, alter_commands_iter
134+
):
135+
""" Generate a series of SQL commands to start a copy of the existing_table
136+
to a new_table, applying the supplied alterations before starting the
137+
triggers. """
138+
log = logging.getLogger(
139+
f"_generate_sql_copy_commands:{existing_table.name} to {new_table.name}"
140+
)
141+
142+
max_val_part = map_data["partitions"][-1]
143+
if not isinstance(max_val_part, partitionmanager.types.MaxValuePartition):
144+
log.error(f"Expected a MaxValue partition, got {max_val_part}")
145+
raise Exception("Unexpected part?")
146+
147+
range_id_string = ", ".join(map_data["range_cols"])
148+
149+
yield f"DROP TABLE IF EXISTS {new_table.name};"
150+
yield f"CREATE TABLE {new_table.name} LIKE {existing_table.name};"
151+
yield f"ALTER TABLE {new_table.name} REMOVE PARTITIONING;"
152+
yield f"ALTER TABLE {new_table.name} PARTITION BY RANGE({range_id_string}) ("
153+
yield f"\tPARTITION {max_val_part.name} VALUES LESS THAN MAXVALUE"
154+
yield ");"
155+
156+
for command in alter_commands_iter:
157+
yield command
158+
159+
cols = set(columns)
160+
161+
yield f"CREATE OR REPLACE TRIGGER copy_inserts_from_{existing_table.name}_to_{new_table.name}"
162+
yield f"\tAFTER INSERT ON {existing_table.name} FOR EACH ROW"
163+
yield f"\t\tINSERT INTO {new_table.name} SET"
164+
165+
for line in _suffix(
166+
_trigger_column_copies(sorted(cols)),
167+
indent="\t\t\t",
168+
mid_suffix=",",
169+
final_suffix=";",
170+
):
171+
yield line
172+
173+
update_columns = cols.difference(set(map_data["range_cols"]))
174+
if not update_columns:
175+
log.info("No columns to copy, so no UPDATE trigger being constructed.")
176+
return
177+
178+
yield f"CREATE OR REPLACE TRIGGER copy_updates_from_{existing_table.name}_to_{new_table.name}"
179+
yield f"\tAFTER UPDATE ON {existing_table.name} FOR EACH ROW"
180+
yield f"\t\tUPDATE {new_table.name} SET"
181+
182+
for line in _suffix(
183+
_trigger_column_copies(sorted(update_columns)), indent="\t\t\t", mid_suffix=","
184+
):
185+
yield line
186+
187+
yield "\t\tWHERE " + " AND ".join(
188+
_trigger_column_copies(map_data["range_cols"])
189+
) + ";"
190+
191+
return
192+
193+
101194
def calculate_sql_alters_from_state_info(conf, in_fp):
102195
"""
103196
Using the config and the input yaml file-like object, return the SQL
@@ -127,15 +220,22 @@ def calculate_sql_alters_from_state_info(conf, in_fp):
127220
log.info(f"Skipping {table_name} as it is not in the current config")
128221
continue
129222

130-
problem = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
131-
if problem:
132-
raise Exception(problem)
223+
map_data = None
224+
225+
if not conf.assume_partitioned_on:
226+
problem = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
227+
if problem:
228+
raise Exception(problem)
229+
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
230+
else:
231+
map_data = _override_config_to_map_data(conf)
133232

134-
map_data = pm_tap.get_partition_map(conf.dbcmd, table)
135233
current_positions = pm_tap.get_current_positions(
136234
conf.dbcmd, table, map_data["range_cols"]
137235
)
138236

237+
columns = [r["Field"] for r in pm_tap.get_columns(conf.dbcmd, table)]
238+
139239
ordered_current_pos = [
140240
current_positions[name] for name in map_data["range_cols"]
141241
]
@@ -172,8 +272,18 @@ def calculate_sql_alters_from_state_info(conf, in_fp):
172272
max_val_part,
173273
)
174274

275+
table_new = partitionmanager.types.Table(
276+
f"{table.name}_new_{conf.curtime:%Y%m%d}"
277+
)
278+
279+
alter_commands_iter = pm_tap.generate_sql_reorganize_partition_commands(
280+
table_new, changes
281+
)
282+
175283
commands[table.name] = list(
176-
pm_tap.generate_sql_reorganize_partition_commands(table, changes)
284+
_generate_sql_copy_commands(
285+
table, map_data, columns, table_new, alter_commands_iter
286+
)
177287
)
178288

179289
return commands

partitionmanager/bootstrap_test.py

Lines changed: 146 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
from datetime import datetime, timedelta
55

66
from .bootstrap import (
7+
_generate_sql_copy_commands,
78
_get_time_offsets,
9+
_suffix,
10+
_trigger_column_copies,
11+
_override_config_to_map_data,
812
calculate_sql_alters_from_state_info,
913
write_state_info,
1014
)
@@ -14,7 +18,8 @@
1418

1519
class MockDatabase(DatabaseCommand):
1620
def __init__(self):
17-
self.response = []
21+
self._response = list()
22+
self._select_response = [[{"id": 150}]]
1823
self.num_queries = 0
1924

2025
def run(self, cmd):
@@ -36,9 +41,9 @@ def run(self, cmd):
3641
]
3742

3843
if "SELECT" in cmd:
39-
return [{"id": 150}]
44+
return self._select_response.pop()
4045

41-
return self.response
46+
return self._response.pop()
4247

4348
def db_name(self):
4449
return SqlInput("the-database")
@@ -82,6 +87,7 @@ def test_get_time_offsets(self):
8287
)
8388

8489
def test_read_state_info(self):
90+
self.maxDiff = None
8591
conf_past = Config()
8692
conf_past.curtime = datetime(2021, 3, 1)
8793
conf_past.dbcmd = MockDatabase()
@@ -97,6 +103,12 @@ def test_read_state_info(self):
97103
conf_now = Config()
98104
conf_now.curtime = datetime(2021, 3, 3)
99105
conf_now.dbcmd = MockDatabase()
106+
conf_now.dbcmd._response = [
107+
[
108+
{"Field": "id", "Type": "bigint UNSIGNED"},
109+
{"Field": "serial", "Type": "varchar"},
110+
]
111+
]
100112
conf_now.tables = [Table("test").set_partition_period(timedelta(days=30))]
101113

102114
state_fs.seek(0)
@@ -105,10 +117,137 @@ def test_read_state_info(self):
105117
x,
106118
{
107119
"test": [
108-
"ALTER TABLE `test` REORGANIZE PARTITION `p_start` INTO "
109-
"(PARTITION `p_20210303` VALUES LESS THAN (156), "
110-
"PARTITION `p_20210402` VALUES LESS THAN (2406), "
111-
"PARTITION `p_20210502` VALUES LESS THAN MAXVALUE);"
120+
"DROP TABLE IF EXISTS test_new_20210303;",
121+
"CREATE TABLE test_new_20210303 LIKE test;",
122+
"ALTER TABLE test_new_20210303 REMOVE PARTITIONING;",
123+
"ALTER TABLE test_new_20210303 PARTITION BY RANGE(id) (",
124+
"\tPARTITION p_start VALUES LESS THAN MAXVALUE",
125+
");",
126+
"ALTER TABLE `test_new_20210303` REORGANIZE PARTITION `p_start` "
127+
+ "INTO (PARTITION `p_20210303` VALUES LESS THAN (156), "
128+
+ "PARTITION `p_20210402` VALUES LESS THAN (2406), PARTITION "
129+
+ "`p_20210502` VALUES LESS THAN MAXVALUE);",
130+
"CREATE OR REPLACE TRIGGER copy_inserts_from_test_to_test_new_20210303",
131+
"\tAFTER INSERT ON test FOR EACH ROW",
132+
"\t\tINSERT INTO test_new_20210303 SET",
133+
"\t\t\t`id` = NEW.`id`,",
134+
"\t\t\t`serial` = NEW.`serial`;",
135+
"CREATE OR REPLACE TRIGGER copy_updates_from_test_to_test_new_20210303",
136+
"\tAFTER UPDATE ON test FOR EACH ROW",
137+
"\t\tUPDATE test_new_20210303 SET",
138+
"\t\t\t`serial` = NEW.`serial`",
139+
"\t\tWHERE `id` = NEW.`id`;",
112140
]
113141
},
114142
)
143+
144+
def test_read_state_info_map_table(self):
145+
self.maxDiff = None
146+
conf = Config()
147+
conf.assume_partitioned_on = ["order", "auth"]
148+
conf.curtime = datetime(2021, 3, 3)
149+
conf.dbcmd = MockDatabase()
150+
conf.dbcmd._select_response = [[{"auth": 22}], [{"order": 11}]]
151+
conf.dbcmd._response = [
152+
[
153+
{"Field": "order", "Type": "bigint UNSIGNED"},
154+
{"Field": "auth", "Type": "bigint UNSIGNED"},
155+
]
156+
]
157+
conf.tables = [Table("map_table").set_partition_period(timedelta(days=30))]
158+
159+
state_fs = io.StringIO()
160+
yaml.dump(
161+
{
162+
"tables": {"map_table": {"order": 11, "auth": 22}},
163+
"time": (conf.curtime - timedelta(days=1)),
164+
},
165+
state_fs,
166+
)
167+
state_fs.seek(0)
168+
169+
x = calculate_sql_alters_from_state_info(conf, state_fs)
170+
print(x)
171+
self.assertEqual(
172+
x,
173+
{
174+
"map_table": [
175+
"DROP TABLE IF EXISTS map_table_new_20210303;",
176+
"CREATE TABLE map_table_new_20210303 LIKE map_table;",
177+
"ALTER TABLE map_table_new_20210303 REMOVE PARTITIONING;",
178+
"ALTER TABLE map_table_new_20210303 PARTITION BY RANGE(order, auth) (",
179+
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
180+
");",
181+
"ALTER TABLE `map_table_new_20210303` REORGANIZE PARTITION "
182+
+ "`p_assumed` INTO (PARTITION `p_20210303` VALUES LESS THAN "
183+
+ "(11, 22), PARTITION `p_20210402` VALUES LESS THAN "
184+
+ "(11, 22), PARTITION `p_20210502` VALUES LESS THAN "
185+
+ "MAXVALUE, MAXVALUE);",
186+
"CREATE OR REPLACE TRIGGER copy_inserts_from_map_table_"
187+
+ "to_map_table_new_20210303",
188+
"\tAFTER INSERT ON map_table FOR EACH ROW",
189+
"\t\tINSERT INTO map_table_new_20210303 SET",
190+
"\t\t\t`auth` = NEW.`auth`,",
191+
"\t\t\t`order` = NEW.`order`;",
192+
]
193+
},
194+
)
195+
196+
def test_trigger_column_copies(self):
197+
self.assertEqual(list(_trigger_column_copies([])), [])
198+
self.assertEqual(list(_trigger_column_copies(["a"])), ["`a` = NEW.`a`"])
199+
self.assertEqual(
200+
list(_trigger_column_copies(["b", "a", "c"])),
201+
["`b` = NEW.`b`", "`a` = NEW.`a`", "`c` = NEW.`c`"],
202+
)
203+
204+
def test_suffix(self):
205+
self.assertEqual(list(_suffix(["a"])), ["a"])
206+
self.assertEqual(list(_suffix(["a", "b"])), ["a", "b"])
207+
self.assertEqual(list(_suffix(["a", "b"], indent=" ")), [" a", " b"])
208+
self.assertEqual(list(_suffix(["a", "b"], mid_suffix=",")), ["a,", "b"])
209+
self.assertEqual(list(_suffix(["a", "b"], final_suffix=";")), ["a", "b;"])
210+
self.assertEqual(
211+
list(_suffix(["a", "b"], mid_suffix=",", final_suffix=";")), ["a,", "b;"]
212+
)
213+
214+
def test_generate_sql_copy_commands(self):
215+
conf = Config()
216+
conf.assume_partitioned_on = ["id"]
217+
conf.curtime = datetime(2021, 3, 3)
218+
conf.dbcmd = MockDatabase()
219+
map_data = _override_config_to_map_data(conf)
220+
cmds = list(
221+
_generate_sql_copy_commands(
222+
Table("old"),
223+
map_data,
224+
["id", "field"],
225+
Table("new"),
226+
["STRAIGHT_UP_INSERTED", "STUFF GOES HERE"],
227+
)
228+
)
229+
230+
print(cmds)
231+
self.assertEqual(
232+
cmds,
233+
[
234+
"DROP TABLE IF EXISTS new;",
235+
"CREATE TABLE new LIKE old;",
236+
"ALTER TABLE new REMOVE PARTITIONING;",
237+
"ALTER TABLE new PARTITION BY RANGE(id) (",
238+
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
239+
");",
240+
"STRAIGHT_UP_INSERTED",
241+
"STUFF GOES HERE",
242+
"CREATE OR REPLACE TRIGGER copy_inserts_from_old_to_new",
243+
"\tAFTER INSERT ON old FOR EACH ROW",
244+
"\t\tINSERT INTO new SET",
245+
"\t\t\t`field` = NEW.`field`,",
246+
"\t\t\t`id` = NEW.`id`;",
247+
"CREATE OR REPLACE TRIGGER copy_updates_from_old_to_new",
248+
"\tAFTER UPDATE ON old FOR EACH ROW",
249+
"\t\tUPDATE new SET",
250+
"\t\t\t`field` = NEW.`field`",
251+
"\t\tWHERE `id` = NEW.`id`;",
252+
],
253+
)

partitionmanager/cli.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def __init__(self):
5959
self.curtime = datetime.now(tz=timezone.utc)
6060
self.partition_period = timedelta(days=30)
6161
self.prometheus_stats_path = None
62+
self.assume_partitioned_on = None
6263

6364
def from_argparse(self, args):
6465
"""Populate this config from an argparse result.
@@ -80,6 +81,8 @@ def from_argparse(self, args):
8081
self.noop = args.noop
8182
if "prometheus_stats" in args:
8283
self.prometheus_stats_path = args.prometheus_stats
84+
if "assume_partitioned_on" in args:
85+
self.assume_partitioned_on = args.assume_partitioned_on
8386

8487
def from_yaml_file(self, file):
8588
"""Populate this config from the yaml in the file-like object supplied.
@@ -253,6 +256,13 @@ def bootstrap_cmd(args):
253256
nargs="+",
254257
help="table names, overwriting config",
255258
)
259+
BOOTSTRAP_PARSER.add_argument(
260+
"--assume-partitioned-on",
261+
type=partitionmanager.types.SqlInput,
262+
action="append",
263+
help="Assume tables are partitioned by this column name, can be specified "
264+
"multiple times for multi-column partitions",
265+
)
256266
BOOTSTRAP_PARSER.set_defaults(func=bootstrap_cmd)
257267

258268

0 commit comments

Comments
 (0)