Skip to content

Commit f4b9af9

Browse files
author
Nathan Lee
committed
Initial changes for ABORT signaling feature.
1 parent a9854d3 commit f4b9af9

File tree

7 files changed

+65
-11
lines changed

7 files changed

+65
-11
lines changed

pyrunner/core/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,17 @@ def ctx_file(self):
238238
else:
239239
return '{}/{}.ctx'.format(self['temp_dir'], self['app_name'])
240240

241+
@property
242+
def abort_sig_file(self):
243+
"""
244+
Path/filename of job's .sig file.
245+
This file should only appear in case any signal is to be sent to a running job.
246+
"""
247+
if not self['temp_dir'] or not self['app_name']:
248+
return None
249+
else:
250+
return '{}/.{}.sig.abort'.format(self['temp_dir'], self['app_name'])
251+
241252
def source_config_file(self, config_file):
242253
"""
243254
Sources config file to export environment variables.

pyrunner/core/engine.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ def initiate(self, **kwargs):
5757
try:
5858
while self.register.running_nodes or self.register.pending_nodes:
5959

60+
# Check for abort signals
61+
if os.path.exists(self.config.abort_sig_file):
62+
print('ABORT signal received! Terminating all running Workers.')
63+
self._abort_all_workers()
64+
return -1
65+
6066
# Poll running nodes for completion/failure
6167
for node in self.register.running_nodes.copy():
6268
retcode = node.poll()
@@ -106,9 +112,8 @@ def initiate(self, **kwargs):
106112
except KeyboardInterrupt:
107113
print('\nKeyboard Interrupt Received')
108114
print('\nCancelling Execution')
109-
for node in self.register.running_nodes:
110-
node.terminate()
111-
return
115+
self._abort_all_workers()
116+
return -1
112117

113118
if not kwargs.get('silent'):
114119
self._print_final_state(ab_code)
@@ -118,6 +123,13 @@ def initiate(self, **kwargs):
118123

119124
return len(self.register.failed_nodes)
120125

126+
def _abort_all_workers(self):
127+
for node in self.register.running_nodes:
128+
node.terminate()
129+
#self.register.running_nodes.remove(node)
130+
#self.register.aborted_nodes.add(node)
131+
#self.register.set_children_defaulted(node)
132+
121133
def _print_current_state(self):
122134
elapsed = time.time() - self.start_time
123135

pyrunner/core/node.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,16 @@ def execute(self):
8686

8787
try:
8888
worker_class = getattr(importlib.import_module(self.module), self.worker)
89+
90+
# Check if provided worker actually extends the Worker class.
8991
if issubclass(worker_class, Worker):
9092
worker = worker_class(self.context, self._retcode, self.logfile, self.argv)
93+
# If it does not extend the Worker class, initialize a reverse-Worker in which the
94+
# worker extends the provided class.
9195
else:
9296
worker = self.generate_worker()(self.context, self._retcode, self.logfile, self.argv)
97+
98+
# Launch the "run" method of the provided Worker under a new process.
9399
self._thread = multiprocessing.Process(target=worker.protected_run, daemon=False)
94100
self._thread.start()
95101
except Exception as e:
@@ -130,6 +136,10 @@ def poll(self, wait=False):
130136
def terminate(self):
131137
if self._thread.is_alive():
132138
self._thread.terminate()
139+
logger = lg.FileLogger(self.logfile)
140+
logger.open(False)
141+
logger._system_("Keyboard Interrupt (SIGINT) received. Terminating all Worker and exiting.")
142+
logger.close()
133143
return
134144

135145

pyrunner/core/pyrunner.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def plugin_notification(self, obj):
121121
if not isinstance(obj, notification.Notification): raise Exception('Notification plugin must implement the Notification interface')
122122
self.notification = obj
123123

124-
# App lifecycle hooks
124+
# App lifecycle hooks/decorators
125125
def on_create(self, func):
126126
self._on_create_func = func
127127
def on_start(self, func):
@@ -192,15 +192,15 @@ def run(self):
192192

193193
emit_notification = True
194194

195-
# # App lifecycle - SUCCESS
195+
# App lifecycle - SUCCESS
196196
if retcode == 0:
197197
if self._on_success_func:
198198
self._on_success_func()
199199
if not self.config['email_on_success']:
200200
print('Skipping Email Notification: Property "email_on_success" is set to FALSE.')
201201
emit_notification = False
202-
# # App lifecycle - FAIL
203-
else:
202+
# App lifecycle - FAIL (<0 is for ABORT or other interrupt)
203+
elif retcode > 0:
204204
if self._on_fail_func:
205205
self._on_fail_func()
206206
if not self.config['email_on_fail']:
@@ -265,7 +265,12 @@ def zip_log_files(self, exit_status):
265265

266266
try:
267267

268-
suffix = 'FAILURE' if exit_status else 'SUCCESS'
268+
if exit_status == -1:
269+
suffix = 'ABORT'
270+
elif exit_status > 0:
271+
suffix = 'FAILURE'
272+
else:
273+
suffix = 'SUCCESS'
269274

270275
zip_file = "{}/{}_{}_{}.zip".format(self.config['log_dir'], self.config['app_name'], constants.EXECUTION_TIMESTAMP, suffix)
271276
print('Zipping Up Log Files to: {}'.format(zip_file))
@@ -356,9 +361,11 @@ def exec_from(self, id) : return self.register.exec_from(id)
356361
def exec_disable(self, id_list) : return self.register.exec_disable(id_list)
357362

358363
def parse_args(self):
364+
abort = False
365+
359366
opt_list = 'c:l:n:e:x:N:D:A:t:drhiv'
360367
longopt_list = [
361-
'setup', 'help', 'nozip', 'interactive',
368+
'setup', 'help', 'nozip', 'interactive', 'abort',
362369
'restart', 'version', 'dryrun', 'debug',
363370
'preserve-context', 'dump-logs', 'disable-exclusive-jobs',
364371
'email=', 'email-on-fail=', 'email-on-success=', 'ef=', 'es=',
@@ -421,6 +428,8 @@ def parse_args(self):
421428
self.disable_exclusive_jobs = True
422429
elif opt in ['--exec-proc-name']:
423430
self._init_params['exec_proc_name'] = arg
431+
elif opt == '--abort':
432+
abort = True
424433
elif opt in ['--serde']:
425434
if arg.lower() == 'json':
426435
self.plugin_serde(serde.JsonSerDe())
@@ -441,6 +450,11 @@ def parse_args(self):
441450
raise RuntimeError('Config file (app_profile) has not been provided')
442451
self.config.source_config_file(self._init_params['config_file'])
443452

453+
if abort:
454+
print('Submitting ABORT signal to running job for: {}'.format(self.config['app_name']))
455+
open(self.config.abort_sig_file, 'a').close()
456+
sys.exit(0)
457+
444458
# Check if restart is possible (ctllog/ctx files exist)
445459
if self._init_params['restart'] and not self.is_restartable():
446460
self._init_params['restart'] = False

pyrunner/logger/abstract.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ def error(self, text):
5959
"""
6060
self._emit_('ERROR', text)
6161

62+
def _system_(self, text):
63+
"""
64+
Write a generic SYSTEM level log message.
65+
This is reserved for internal control messages.
66+
"""
67+
self._emit_('SYSTEM', text)
68+
6269
@abstractmethod
6370
def restart_message(self, restart_count):
6471
"""

pyrunner/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '4.2.5'
1+
__version__ = '4.3.0'

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,6 @@
3030
license = 'Apache 2.0',
3131
long_description = 'Python utility providing text-based workflow manager.',
3232
entry_points = {
33-
'console_scripts': ['pyrunner=pyrunner.cli:main', 'pyrunner-repo=pyrunner.cli:repo']
33+
'console_scripts': ['pyrunner=pyrunner.cli:main']
3434
}
3535
)

0 commit comments

Comments
 (0)