Skip to content

Commit 1dee3a1

Browse files
committed
The 'bootstrap' command now emits table-copy instructions
The original plan for 'bootstrap' was to do live alterations, that they should only lock what they needed, however InnoDB likes to lock everything. So instead we need to always assume that "bootstrapping" will be a live table clone, and at their conclusion the team should perform an atomic rename.
1 parent cd9fb27 commit 1dee3a1

File tree

6 files changed

+383
-24
lines changed

6 files changed

+383
-24
lines changed

partitionmanager/bootstrap.py

Lines changed: 93 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,18 @@
77
import logging
88
import operator
99
import yaml
10-
1110
from partitionmanager.types import (
1211
ChangePlannedPartition,
1312
MaxValuePartition,
1413
NewPlannedPartition,
14+
Table,
1515
)
1616
from partitionmanager.table_append_partition import (
17-
table_is_compatible,
17+
generate_sql_reorganize_partition_commands,
18+
get_columns,
1819
get_current_positions,
1920
get_partition_map,
20-
generate_sql_reorganize_partition_commands,
21+
table_is_compatible,
2122
)
2223
from .tools import iter_show_end
2324

@@ -89,7 +90,7 @@ def _plan_partitions_for_time_offsets(
8990
"""
9091
changes = list()
9192
for (i, offset), is_final in iter_show_end(enumerate(time_offsets)):
92-
increase = [x * offset / RATE_UNIT for x in rate_of_change]
93+
increase = [x * (offset / RATE_UNIT) for x in rate_of_change]
9394
predicted_positions = [
9495
int(p + i) for p, i in zip(ordered_current_pos, increase)
9596
]
@@ -115,6 +116,83 @@ def _plan_partitions_for_time_offsets(
115116
return changes
116117

117118

119+
def _suffix(lines, *, indent="", mid_suffix="", final_suffix=""):
120+
""" Helper that suffixes each line with either mid- or final- suffix """
121+
for line, is_final in iter_show_end(lines):
122+
if is_final:
123+
yield indent + line + final_suffix
124+
else:
125+
yield indent + line + mid_suffix
126+
127+
128+
def _trigger_column_copies(cols):
129+
""" Helper that returns lines copying each column for a trigger. """
130+
for c in cols:
131+
yield f"`{c}` = NEW.`{c}`"
132+
133+
134+
def _generate_sql_copy_commands(
135+
existing_table, map_data, columns, new_table, alter_commands_iter
136+
):
137+
""" Generate a series of SQL commands to start a copy of the existing_table
138+
to a new_table, applying the supplied alterations before starting the
139+
triggers. """
140+
log = logging.getLogger(
141+
f"_generate_sql_copy_commands:{existing_table.name} to {new_table.name}"
142+
)
143+
144+
max_val_part = map_data["partitions"][-1]
145+
if not isinstance(max_val_part, MaxValuePartition):
146+
log.error(f"Expected a MaxValue partition, got {max_val_part}")
147+
raise Exception("Unexpected part?")
148+
149+
range_id_string = ", ".join(map_data["range_cols"])
150+
151+
yield f"DROP TABLE IF EXISTS {new_table.name};"
152+
yield f"CREATE TABLE {new_table.name} LIKE {existing_table.name};"
153+
yield f"ALTER TABLE {new_table.name} REMOVE PARTITIONING;"
154+
yield f"ALTER TABLE {new_table.name} PARTITION BY RANGE({range_id_string}) ("
155+
yield f"\tPARTITION {max_val_part.name} VALUES LESS THAN MAXVALUE"
156+
yield ");"
157+
158+
for command in alter_commands_iter:
159+
yield command
160+
161+
cols = set(columns)
162+
163+
yield f"CREATE OR REPLACE TRIGGER copy_inserts_from_{existing_table.name}_to_{new_table.name}"
164+
yield f"\tAFTER INSERT ON {existing_table.name} FOR EACH ROW"
165+
yield f"\t\tINSERT INTO {new_table.name} SET"
166+
167+
for line in _suffix(
168+
_trigger_column_copies(sorted(cols)),
169+
indent="\t\t\t",
170+
mid_suffix=",",
171+
final_suffix=";",
172+
):
173+
yield line
174+
175+
update_columns = cols.difference(set(map_data["range_cols"]))
176+
if not update_columns:
177+
log.info("No columns to copy, so no UPDATE trigger being constructed.")
178+
return
179+
180+
yield f"CREATE OR REPLACE TRIGGER copy_updates_from_{existing_table.name}_to_{new_table.name}"
181+
yield f"\tAFTER UPDATE ON {existing_table.name} FOR EACH ROW"
182+
yield f"\t\tUPDATE {new_table.name} SET"
183+
184+
for line in _suffix(
185+
_trigger_column_copies(sorted(update_columns)), indent="\t\t\t", mid_suffix=","
186+
):
187+
yield line
188+
189+
yield "\t\tWHERE " + " AND ".join(
190+
_trigger_column_copies(map_data["range_cols"])
191+
) + ";"
192+
193+
return
194+
195+
118196
def calculate_sql_alters_from_state_info(conf, in_fp):
119197
"""
120198
Using the config and the input yaml file-like object, return the SQL
@@ -154,6 +232,8 @@ def calculate_sql_alters_from_state_info(conf, in_fp):
154232
else:
155233
map_data = _override_config_to_map_data(conf)
156234

235+
columns = [r["Field"] for r in get_columns(conf.dbcmd, table)]
236+
157237
current_positions = get_current_positions(
158238
conf.dbcmd, table, map_data["range_cols"]
159239
)
@@ -194,8 +274,16 @@ def calculate_sql_alters_from_state_info(conf, in_fp):
194274
max_val_part,
195275
)
196276

277+
table_new = Table(f"{table.name}_new_{conf.curtime:%Y%m%d}")
278+
279+
alter_commands_iter = generate_sql_reorganize_partition_commands(
280+
table_new, changes
281+
)
282+
197283
commands[table.name] = list(
198-
generate_sql_reorganize_partition_commands(table, changes)
284+
_generate_sql_copy_commands(
285+
table, map_data, columns, table_new, alter_commands_iter
286+
)
199287
)
200288

201289
return commands

partitionmanager/bootstrap_test.py

Lines changed: 146 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
from datetime import datetime, timedelta
55

66
from .bootstrap import (
7+
_generate_sql_copy_commands,
78
_get_time_offsets,
9+
_suffix,
10+
_trigger_column_copies,
11+
_override_config_to_map_data,
812
calculate_sql_alters_from_state_info,
913
write_state_info,
1014
)
@@ -14,7 +18,8 @@
1418

1519
class MockDatabase(DatabaseCommand):
1620
def __init__(self):
17-
self.response = []
21+
self._response = list()
22+
self._select_response = [[{"id": 150}]]
1823
self.num_queries = 0
1924

2025
def run(self, cmd):
@@ -36,9 +41,9 @@ def run(self, cmd):
3641
]
3742

3843
if "SELECT" in cmd:
39-
return [{"id": 150}]
44+
return self._select_response.pop()
4045

41-
return self.response
46+
return self._response.pop()
4247

4348
def db_name(self):
4449
return SqlInput("the-database")
@@ -82,6 +87,7 @@ def test_get_time_offsets(self):
8287
)
8388

8489
def test_read_state_info(self):
90+
self.maxDiff = None
8591
conf_past = Config()
8692
conf_past.curtime = datetime(2021, 3, 1)
8793
conf_past.dbcmd = MockDatabase()
@@ -97,6 +103,12 @@ def test_read_state_info(self):
97103
conf_now = Config()
98104
conf_now.curtime = datetime(2021, 3, 3)
99105
conf_now.dbcmd = MockDatabase()
106+
conf_now.dbcmd._response = [
107+
[
108+
{"Field": "id", "Type": "bigint UNSIGNED"},
109+
{"Field": "serial", "Type": "varchar"},
110+
]
111+
]
100112
conf_now.tables = [Table("test").set_partition_period(timedelta(days=30))]
101113

102114
state_fs.seek(0)
@@ -105,10 +117,137 @@ def test_read_state_info(self):
105117
x,
106118
{
107119
"test": [
108-
"ALTER TABLE `test` REORGANIZE PARTITION `p_start` INTO "
109-
"(PARTITION `p_20210303` VALUES LESS THAN (156), "
110-
"PARTITION `p_20210402` VALUES LESS THAN (2406), "
111-
"PARTITION `p_20210502` VALUES LESS THAN MAXVALUE);"
120+
"DROP TABLE IF EXISTS test_new_20210303;",
121+
"CREATE TABLE test_new_20210303 LIKE test;",
122+
"ALTER TABLE test_new_20210303 REMOVE PARTITIONING;",
123+
"ALTER TABLE test_new_20210303 PARTITION BY RANGE(id) (",
124+
"\tPARTITION p_start VALUES LESS THAN MAXVALUE",
125+
");",
126+
"ALTER TABLE `test_new_20210303` REORGANIZE PARTITION `p_start` "
127+
+ "INTO (PARTITION `p_20210303` VALUES LESS THAN (156), "
128+
+ "PARTITION `p_20210402` VALUES LESS THAN (2406), PARTITION "
129+
+ "`p_20210502` VALUES LESS THAN MAXVALUE);",
130+
"CREATE OR REPLACE TRIGGER copy_inserts_from_test_to_test_new_20210303",
131+
"\tAFTER INSERT ON test FOR EACH ROW",
132+
"\t\tINSERT INTO test_new_20210303 SET",
133+
"\t\t\t`id` = NEW.`id`,",
134+
"\t\t\t`serial` = NEW.`serial`;",
135+
"CREATE OR REPLACE TRIGGER copy_updates_from_test_to_test_new_20210303",
136+
"\tAFTER UPDATE ON test FOR EACH ROW",
137+
"\t\tUPDATE test_new_20210303 SET",
138+
"\t\t\t`serial` = NEW.`serial`",
139+
"\t\tWHERE `id` = NEW.`id`;",
112140
]
113141
},
114142
)
143+
144+
def test_read_state_info_map_table(self):
145+
self.maxDiff = None
146+
conf = Config()
147+
conf.assume_partitioned_on = ["order", "auth"]
148+
conf.curtime = datetime(2021, 3, 3)
149+
conf.dbcmd = MockDatabase()
150+
conf.dbcmd._select_response = [[{"auth": 22}], [{"order": 11}]]
151+
conf.dbcmd._response = [
152+
[
153+
{"Field": "order", "Type": "bigint UNSIGNED"},
154+
{"Field": "auth", "Type": "bigint UNSIGNED"},
155+
]
156+
]
157+
conf.tables = [Table("map_table").set_partition_period(timedelta(days=30))]
158+
159+
state_fs = io.StringIO()
160+
yaml.dump(
161+
{
162+
"tables": {"map_table": {"order": 11, "auth": 22}},
163+
"time": (conf.curtime - timedelta(days=1)),
164+
},
165+
state_fs,
166+
)
167+
state_fs.seek(0)
168+
169+
x = calculate_sql_alters_from_state_info(conf, state_fs)
170+
print(x)
171+
self.assertEqual(
172+
x,
173+
{
174+
"map_table": [
175+
"DROP TABLE IF EXISTS map_table_new_20210303;",
176+
"CREATE TABLE map_table_new_20210303 LIKE map_table;",
177+
"ALTER TABLE map_table_new_20210303 REMOVE PARTITIONING;",
178+
"ALTER TABLE map_table_new_20210303 PARTITION BY RANGE(order, auth) (",
179+
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
180+
");",
181+
"ALTER TABLE `map_table_new_20210303` REORGANIZE PARTITION "
182+
+ "`p_assumed` INTO (PARTITION `p_20210303` VALUES LESS THAN "
183+
+ "(11, 22), PARTITION `p_20210402` VALUES LESS THAN "
184+
+ "(11, 22), PARTITION `p_20210502` VALUES LESS THAN "
185+
+ "MAXVALUE, MAXVALUE);",
186+
"CREATE OR REPLACE TRIGGER copy_inserts_from_map_table_"
187+
+ "to_map_table_new_20210303",
188+
"\tAFTER INSERT ON map_table FOR EACH ROW",
189+
"\t\tINSERT INTO map_table_new_20210303 SET",
190+
"\t\t\t`auth` = NEW.`auth`,",
191+
"\t\t\t`order` = NEW.`order`;",
192+
]
193+
},
194+
)
195+
196+
def test_trigger_column_copies(self):
197+
self.assertEqual(list(_trigger_column_copies([])), [])
198+
self.assertEqual(list(_trigger_column_copies(["a"])), ["`a` = NEW.`a`"])
199+
self.assertEqual(
200+
list(_trigger_column_copies(["b", "a", "c"])),
201+
["`b` = NEW.`b`", "`a` = NEW.`a`", "`c` = NEW.`c`"],
202+
)
203+
204+
def test_suffix(self):
205+
self.assertEqual(list(_suffix(["a"])), ["a"])
206+
self.assertEqual(list(_suffix(["a", "b"])), ["a", "b"])
207+
self.assertEqual(list(_suffix(["a", "b"], indent=" ")), [" a", " b"])
208+
self.assertEqual(list(_suffix(["a", "b"], mid_suffix=",")), ["a,", "b"])
209+
self.assertEqual(list(_suffix(["a", "b"], final_suffix=";")), ["a", "b;"])
210+
self.assertEqual(
211+
list(_suffix(["a", "b"], mid_suffix=",", final_suffix=";")), ["a,", "b;"]
212+
)
213+
214+
def test_generate_sql_copy_commands(self):
215+
conf = Config()
216+
conf.assume_partitioned_on = ["id"]
217+
conf.curtime = datetime(2021, 3, 3)
218+
conf.dbcmd = MockDatabase()
219+
map_data = _override_config_to_map_data(conf)
220+
cmds = list(
221+
_generate_sql_copy_commands(
222+
Table("old"),
223+
map_data,
224+
["id", "field"],
225+
Table("new"),
226+
["STRAIGHT_UP_INSERTED", "STUFF GOES HERE"],
227+
)
228+
)
229+
230+
print(cmds)
231+
self.assertEqual(
232+
cmds,
233+
[
234+
"DROP TABLE IF EXISTS new;",
235+
"CREATE TABLE new LIKE old;",
236+
"ALTER TABLE new REMOVE PARTITIONING;",
237+
"ALTER TABLE new PARTITION BY RANGE(id) (",
238+
"\tPARTITION p_assumed VALUES LESS THAN MAXVALUE",
239+
");",
240+
"STRAIGHT_UP_INSERTED",
241+
"STUFF GOES HERE",
242+
"CREATE OR REPLACE TRIGGER copy_inserts_from_old_to_new",
243+
"\tAFTER INSERT ON old FOR EACH ROW",
244+
"\t\tINSERT INTO new SET",
245+
"\t\t\t`field` = NEW.`field`,",
246+
"\t\t\t`id` = NEW.`id`;",
247+
"CREATE OR REPLACE TRIGGER copy_updates_from_old_to_new",
248+
"\tAFTER UPDATE ON old FOR EACH ROW",
249+
"\t\tUPDATE new SET",
250+
"\t\t\t`field` = NEW.`field`",
251+
"\t\tWHERE `id` = NEW.`id`;",
252+
],
253+
)

0 commit comments

Comments
 (0)