Skip to content

Handle mispredictions cleanly #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions partitionmanager/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def test_partition_cmd_noop(self):
"sql": (
"ALTER TABLE `testtable_noop` REORGANIZE PARTITION "
"`p_20201204` INTO "
"(PARTITION `p_20201205` VALUES LESS THAN (548), "
"PARTITION `p_20210104` VALUES LESS THAN MAXVALUE);"
"(PARTITION `p_20201112` VALUES LESS THAN (548), "
"PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);"
),
"noop": True,
}
Expand All @@ -101,8 +101,8 @@ def test_partition_cmd_final(self):
"sql": (
"ALTER TABLE `testtable_commit` REORGANIZE PARTITION "
"`p_20201204` INTO "
"(PARTITION `p_20201205` VALUES LESS THAN (548), "
"PARTITION `p_20210104` VALUES LESS THAN MAXVALUE);"
"(PARTITION `p_20201112` VALUES LESS THAN (548), "
"PARTITION `p_20201212` VALUES LESS THAN MAXVALUE);"
),
}
},
Expand Down
57 changes: 44 additions & 13 deletions partitionmanager/table_append_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ def _predict_forward_time(current_position, end_position, rates, evaluation_time

if max(days_remaining) < 0:
raise ValueError(f"All values are negative: {days_remaining}")
return evaluation_time + (max(days_remaining) * timedelta(days=1))
calculated = evaluation_time + (max(days_remaining) * timedelta(days=1))
return calculated.replace(minute=0, second=0, microsecond=0)


def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan):
Expand All @@ -323,7 +324,7 @@ def _calculate_start_time(last_changed_time, evaluation_time, allowed_lifespan):
if partition_start_time < evaluation_time:
# Partition start times should never be in the past.
return evaluation_time
return partition_start_time
return partition_start_time.replace(minute=0, second=0, microsecond=0)


def _plan_partition_changes(
Expand Down Expand Up @@ -378,10 +379,13 @@ def _plan_partition_changes(
# to exclude the future-dated, irrelevant partition.
log.debug(
f"Misprediction: Evaluation time ({evaluation_time}) is "
f"before the active partition {active_partition}. Excluding from "
"rate calculations."
f"before the active partition {active_partition}. Excluding "
"mispredicted partitions from the rate calculations."
)
rate_relevant_partitions = filled_partitions + [
filled_partitions = filter(
lambda f: f.timestamp() < evaluation_time, filled_partitions
)
rate_relevant_partitions = list(filled_partitions) + [
partitionmanager.types.InstantPartition(evaluation_time, current_position)
]

Expand All @@ -403,16 +407,16 @@ def _plan_partition_changes(

changed_partition = partitionmanager.types.ChangePlannedPartition(partition)

start_of_fill_time = _predict_forward_time(
current_position, last_changed.position, rates, evaluation_time
)

if isinstance(partition, partitionmanager.types.PositionPartition):
# We can't change the position on this partition, but we can adjust
# the name to be more exact as to what date we expect it to begin
# filling. If we calculate the start-of-fill date and it doesn't
# match the partition's name, let's rename it and mark it as an
# important change.
start_of_fill_time = _predict_forward_time(
current_position, last_changed.position, rates, evaluation_time
)

if start_of_fill_time.date() != partition.timestamp().date():
log.info(
f"Start-of-fill predicted at {start_of_fill_time.date()} "
Expand All @@ -427,15 +431,21 @@ def _plan_partition_changes(
# we calculate forward what position we expect and use it in the
# future.

partition_start_time = _calculate_start_time(
nominal_partition_start_time = _calculate_start_time(
last_changed.timestamp(), evaluation_time, allowed_lifespan
)

# We use the nearest timestamp, which should generally be the
# calculated time, but could be the fill time based on predicting
# forward if we have gotten far off in our predictions in the past.
changed_partition.set_timestamp(
min(nominal_partition_start_time, start_of_fill_time)
)

changed_part_pos = _predict_forward_position(
last_changed.position.as_list(), rates, allowed_lifespan
)
changed_partition.set_position(changed_part_pos).set_timestamp(
partition_start_time
)
changed_partition.set_position(changed_part_pos)

results.append(changed_partition)

Expand All @@ -455,6 +465,27 @@ def _plan_partition_changes(
.set_timestamp(partition_start_time)
)

# Confirm we won't make timestamp conflicts
existing_timestamps = list(map(lambda p: p.timestamp(), partition_list))
conflict_found = True
while conflict_found:
conflict_found = False
for partition in results:
if partition.timestamp() in existing_timestamps:
if (
isinstance(partition, partitionmanager.types.ChangePlannedPartition)
and partition.timestamp() == partition.old.timestamp()
):
# That's not a conflict
continue

log.debug(
f"{partition} has a conflict for its timestamp, increasing by 1 day."
)
partition.set_timestamp(partition.timestamp() + timedelta(days=1))
conflict_found = True
break

# Final result is always MAXVALUE
results[-1].set_as_max_value()

Expand Down
33 changes: 18 additions & 15 deletions partitionmanager/table_append_partition_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,16 +565,16 @@ def test_plan_partition_changes_wildly_off_dates(self):
)

self.assertEqual(
planned,
[
ChangePlannedPartition(mkPPart("p_20201231", 100)),
ChangePlannedPartition(mkPPart("p_20210104", 200))
.set_timestamp(datetime(2021, 1, 2, tzinfo=timezone.utc))
.set_important(),
ChangePlannedPartition(mkTailPart("future")).set_timestamp(
datetime(2021, 1, 9, tzinfo=timezone.utc)
datetime(2021, 1, 5, tzinfo=timezone.utc)
),
],
planned,
)

def test_plan_partition_changes_long_delay(self):
Expand Down Expand Up @@ -604,6 +604,7 @@ def test_plan_partition_changes_long_delay(self):
)

def test_plan_partition_changes_short_names(self):
self.maxDiff = None
planned = _plan_partition_changes(
[
mkPPart("p_2019", 1912499867),
Expand Down Expand Up @@ -679,6 +680,7 @@ def test_plan_partition_changes_bespoke_names(self):
)

def test_plan_partition_changes(self):
self.maxDiff = None
planned = _plan_partition_changes(
[
mkPPart("p_20201231", 100),
Expand All @@ -697,7 +699,7 @@ def test_plan_partition_changes(self):
ChangePlannedPartition(mkPPart("p_20201231", 100)),
ChangePlannedPartition(mkPPart("p_20210102", 200)),
ChangePlannedPartition(mkTailPart("future")).set_timestamp(
datetime(2021, 1, 9, tzinfo=timezone.utc)
datetime(2021, 1, 4, tzinfo=timezone.utc)
),
],
)
Expand All @@ -718,19 +720,20 @@ def test_plan_partition_changes(self):
ChangePlannedPartition(mkPPart("p_20210102", 200)).set_position([200]),
ChangePlannedPartition(mkTailPart("future"))
.set_position([320])
.set_timestamp(datetime(2021, 1, 9, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 1, 3, tzinfo=timezone.utc)),
NewPlannedPartition()
.set_position([440])
.set_timestamp(datetime(2021, 1, 16, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 1, 10, tzinfo=timezone.utc)),
NewPlannedPartition()
.set_columns(1)
.set_timestamp(datetime(2021, 1, 23, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 1, 17, tzinfo=timezone.utc)),
],
)

def test_plan_partition_changes_misprediction(self):
""" We have to handle the case where the partition list doesn't cleanly
match reality. """
self.maxDiff = None
planned = _plan_partition_changes(
[
mkPPart("p_20210505", 9505010028),
Expand All @@ -748,15 +751,15 @@ def test_plan_partition_changes_misprediction(self):
planned,
[
ChangePlannedPartition(mkPPart("p_20210704", 10799505006)),
ChangePlannedPartition(mkTailPart("p_20210803")).set_position(
[11578057459]
),
ChangePlannedPartition(mkTailPart("p_20210803"))
.set_position([11578057459])
.set_timestamp(datetime(2021, 6, 28, tzinfo=timezone.utc)),
NewPlannedPartition()
.set_position([12356609912])
.set_timestamp(datetime(2021, 9, 2, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 7, 28, tzinfo=timezone.utc)),
NewPlannedPartition()
.set_columns(1)
.set_timestamp(datetime(2021, 10, 2, tzinfo=timezone.utc)),
.set_timestamp(datetime(2021, 8, 27, tzinfo=timezone.utc)),
],
)

Expand Down Expand Up @@ -984,7 +987,7 @@ def test_plan_andgenerate_sql_reorganize_partition_commands_with_future_partitio
list(generate_sql_reorganize_partition_commands(Table("water"), planned)),
[
"ALTER TABLE `water` REORGANIZE PARTITION `future` INTO "
"(PARTITION `p_20210109` VALUES LESS THAN MAXVALUE);",
"(PARTITION `p_20210105` VALUES LESS THAN MAXVALUE);",
"ALTER TABLE `water` REORGANIZE PARTITION `p_20210104` INTO "
"(PARTITION `p_20210102` VALUES LESS THAN (200));",
],
Expand Down Expand Up @@ -1046,9 +1049,9 @@ def test_get_pending_sql_reorganize_partition_commands_with_changes(self):
list(cmds),
[
"ALTER TABLE `plushies` REORGANIZE PARTITION `future` INTO "
"(PARTITION `p_20210109` VALUES LESS THAN (550), "
"PARTITION `p_20210116` VALUES LESS THAN (900), "
"PARTITION `p_20210123` VALUES LESS THAN MAXVALUE);"
"(PARTITION `p_20210104` VALUES LESS THAN (550), "
"PARTITION `p_20210111` VALUES LESS THAN (900), "
"PARTITION `p_20210118` VALUES LESS THAN MAXVALUE);"
],
)

Expand Down