Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
### Changed

- Requires Python 3.6+ to match Chaos Toolkit itself
- Added a flag to the safeguards control so that it waits for the activity
to fail before it interrupts the execution gracefully. This should not
requires signals nor threading interruptions and therefore be safer [#5][5]

[5]: https://github.com/chaostoolkit/chaostoolkit-addons/issues/5

## [0.2.0][]

Expand Down
117 changes: 90 additions & 27 deletions chaosaddons/controls/safeguards.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,56 +78,74 @@
means that while the experiment has ended, your probe could be not returning
and therefore blocking the process. Make sure your probe do not make blocking
calls for too long.

The safeguard may take an extra boolean argument, `interrupt_after_activity`,
that, when set to `true`, requests that the experiment only gets interrupted
after the current activity rather than immediatly. This can be better in cases
where trying to exit during a blocking activity may lead to random behaviors
due to how Python behaves. With this flag, no signals are emitted and threads
are not arbitrarely interrupted.
see: https://github.com/chaostoolkit/chaostoolkit/issues/210

"""
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from functools import partial
import threading
import time
import traceback
from typing import List
from typing import List, Optional

from logzero import logger

from chaoslib.activity import run_activity
from chaoslib.caching import lookup_activity
from chaoslib.control import controls
from chaoslib.exceptions import ActivityFailed
from chaoslib.exceptions import ActivityFailed, InterruptExecution
from chaoslib.exit import exit_gracefully
from chaoslib.hypothesis import within_tolerance
from chaoslib.types import Configuration, \
from chaoslib.types import Activity, Configuration, \
Experiment, Probe, Run, Secrets, Settings


from .synchronization import experiment_finished

guardian_lock = threading.Lock()

class Guardian(threading.local):

class State:
def __init__(self) -> None:
self._lock = threading.Lock()
self._interrupted = False

@property
def interrupted(self) -> bool:
"""
Flag that says one of our safeguards raised an execution interruption
"""
with self._lock:
with guardian_lock:
return self._interrupted

@interrupted.setter
def interrupted(self, value: bool) -> None:
"""
Set the interruption flag on.
"""
with self._lock:
with guardian_lock:
self._interrupted = value

def prepare(self, probes: List[Probe]) -> None:

class Guardian(threading.local):
def __init__(self) -> None:
self._interrupt_after_activity = None

def prepare(self, probes: List[Probe],
interrupt_after_activity: Optional[bool] = None) -> None:
"""
Configure the guardian so that it runs with the right amount of
resources.
"""
self._interrupt_after_activity = interrupt_after_activity

once_count = 0
repeating_count = 0
now_count = 0
Expand All @@ -146,6 +164,7 @@ def prepare(self, probes: List[Probe]) -> None:
self.repeating = ThreadPoolExecutor(max_workers=repeating_count or 1)

def run(self, experiment: Experiment, probes: List[Probe],
interrupt_after_activity: Optional[bool],
configuration: Configuration, secrets: Secrets,
settings: Settings) -> None:
"""
Expand All @@ -160,17 +179,23 @@ def run(self, experiment: Experiment, probes: List[Probe],
if p.get("frequency"):
f = self.repeating.submit(
run_repeatedly, experiment=experiment,
probe=p, configuration=configuration,
probe=p,
interrupt_after_activity=interrupt_after_activity,
configuration=configuration,
secrets=secrets, stop_repeating=self.repeating_until)
elif p.get("background"):
f = self.once.submit(
run_soon, experiment=experiment,
probe=p, configuration=configuration,
probe=p,
interrupt_after_activity=interrupt_after_activity,
configuration=configuration,
secrets=secrets)
else:
f = self.now.submit(
run_now, experiment=experiment,
probe=p, configuration=configuration,
probe=p,
interrupt_after_activity=interrupt_after_activity,
configuration=configuration,
secrets=secrets, done=self.now_all_done)

if f is not None:
Expand Down Expand Up @@ -202,34 +227,60 @@ def terminate(self) -> None:
self.repeating.shutdown(wait=True)
self.once.shutdown(wait=True)

def should_exit_before_activity(self) -> bool:
return self._interrupt_after_activity and state.interrupted


guardian = Guardian()
state = State()


def configure_control(configuration: Configuration = None,
secrets: Secrets = None, settings: Settings = None,
experiment: Experiment = None,
probes: List[Probe] = None) -> None:
guardian.prepare(probes)
probes: List[Probe] = None,
interrupt_after_activity: Optional[bool] = None) \
-> None:
guardian.prepare(probes, interrupt_after_activity)


def before_experiment_control(context: str,
configuration: Configuration = None,
secrets: Secrets = None,
settings: Settings = None,
experiment: Experiment = None,
probes: List[Probe] = None) -> None:
guardian.run(experiment, probes, configuration, secrets, settings)
probes: List[Probe] = None,
interrupt_after_activity: Optional[bool] = None) \
-> None:
guardian.run(
experiment, probes, interrupt_after_activity, configuration,
secrets, settings)


def after_experiment_control(**kwargs):
guardian.terminate()


def after_activity_control(context: Activity, state: Run,
configuration: Configuration = None,
secrets: Secrets = None,
probes: List[Probe] = None,
interrupt_after_activity: Optional[bool] = None):
# in case we are already finished, this shouldn't occur here though
if experiment_finished.is_set():
return

if guardian.should_exit_before_activity():
raise InterruptExecution(
"Interrupting the experiment, after activity '{}', now as per "
"your safeguards decision".format(context['name']))


###############################################################################
# Internals
###############################################################################
def run_repeatedly(experiment: Experiment, probe: Probe,
interrupt_after_activity: Optional[bool],
configuration: Configuration, secrets: Secrets,
stop_repeating: threading.Event) -> None:
wait_for = probe.get("frequency")
Expand All @@ -240,19 +291,22 @@ def run_repeatedly(experiment: Experiment, probe: Probe,
stop_repeating.wait(timeout=wait_for)
if not stop_repeating.is_set():
interrupt_experiment_on_unhealthy_probe(
probe, run, configuration, secrets)
probe, interrupt_after_activity, run, configuration, secrets)


def run_soon(experiment: Experiment, probe: Probe,
interrupt_after_activity: Optional[bool],
configuration: Configuration,
secrets: Secrets) -> None:
run = execute_activity(
experiment=experiment, probe=probe,
configuration=configuration, secrets=secrets)
interrupt_experiment_on_unhealthy_probe(probe, run, configuration, secrets)
interrupt_experiment_on_unhealthy_probe(
probe, interrupt_after_activity, run, configuration, secrets)


def run_now(experiment: Experiment, probe: Probe,
interrupt_after_activity: Optional[bool],
configuration: Configuration,
secrets: Secrets, done: threading.Barrier) -> None:
try:
Expand All @@ -262,26 +316,35 @@ def run_now(experiment: Experiment, probe: Probe,
finally:
done.wait()

interrupt_experiment_on_unhealthy_probe(probe, run, configuration, secrets)
interrupt_experiment_on_unhealthy_probe(
probe, interrupt_after_activity, run, configuration, secrets)


def interrupt_experiment_on_unhealthy_probe(probe: Probe, run: Run,
configuration: Configuration,
secrets=Secrets) -> None:
def interrupt_experiment_on_unhealthy_probe(
probe: Probe, interrupt_after_activity: Optional[bool], run: Run,
configuration: Configuration, secrets=Secrets) -> None:
if experiment_finished.is_set():
return

tolerance = probe.get("tolerance")
checked = within_tolerance(
tolerance, run["output"], configuration=configuration,
secrets=secrets)
if not checked and not guardian.interrupted:
guardian.interrupted = True
if not checked and not state.interrupted:
state.interrupted = True
if not experiment_finished.is_set():
logger.critical(
"Safeguard '{}' triggered the end of the experiment".format(
probe["name"]))
exit_gracefully()
# we only immediately trigger the interrupt if not asked
# to do it at the next activity instead
if not interrupt_after_activity:
logger.critical(
"Safeguard '{}' triggered the end of the "
"experiment".format(probe["name"]))
exit_gracefully()
else:
logger.critical(
"Safeguard '{}' triggered the end of the "
"experiment. But we will exit only after the current "
"activity is completed".format(probe["name"]))


def execute_activity(experiment: Experiment, probe: Probe,
Expand Down