Skip to content

Commit 01788d7

Browse files
author
Nathan Lee
committed
Removal of legacy style workers. Removed generate_worker and SHELL mode process list option. Patched major file descriptor leak due to joined proc handles not being GC'ed.
1 parent f509395 commit 01788d7

File tree

7 files changed

+34
-171
lines changed

7 files changed

+34
-171
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# python-batch-runner
2+
[![Documentation Status](https://readthedocs.org/projects/python-batch-runner/badge/?version=latest)](https://python-batch-runner.readthedocs.io/en/latest/?badge=latest)
3+
24
For more complete documentation, please see: https://python-batch-runner.readthedocs.io/
35

46
python-batch-runner is a microframework to assist with building small to medium scale batch applications without needing to build the scaffolding from scratch.

pyrunner/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def setup():
127127
app_profile.write('export APP_EXEC_TIMESTAMP=$(date +"%Y%m%d_%H%M%S")\n\n')
128128
app_profile.write('export APP_LOG_DIR="${APP_ROOT_LOG_DIR}/${DATE}"\n\n')
129129
app_profile.write('if [ ! -e ${APP_LOG_DIR} ]; then mkdir -p ${APP_LOG_DIR}; fi\n')
130-
app_profile.write('if [ ! -e ${APP_TEMP_DIR} ]; then mkdir ${APP_TEMP_DIR}; fi\n')
130+
app_profile.write('if [ ! -e ${APP_TEMP_DIR} ]; then mkdir -p ${APP_TEMP_DIR}; fi\n')
131131

132132
print('Creating Blank Process List File: {}/config/{}.lst'.format(app_root, app_name))
133133
with open('{}/config/{}.lst'.format(app_root, app_name), 'w') as lst_file:

pyrunner/core/constants.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,7 @@
2828

2929
EXECUTION_TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
3030

31-
MODE_SHELL = 'SHELL'
3231
MODE_PYTHON = 'PYTHON'
33-
34-
HEADER_SHELL = '#{}\n#ID|PARENT_IDS|MAX_ATTEMPTS|RETRY_WAIT_TIME|PROCESS_NAME|SHELL_COMMAND|LOGFILE'.format(MODE_SHELL)
3532
HEADER_PYTHON = '#{}\n#ID|PARENT_IDS|MAX_ATTEMPTS|RETRY_WAIT_TIME|PROCESS_NAME|MODULE_NAME|WORKER_NAME|ARGUMENTS|LOGFILE'.format(MODE_PYTHON)
3633

3734
ROOT_NODE_NAME = 'PyRunnerRootNode'
@@ -40,7 +37,6 @@
4037
4138
import os, sys
4239
from pyrunner import PyRunner
43-
from pathlib import Path
4440
4541
# Determine absolute path of this file's parent directory at runtime
4642
abs_dir_path = os.path.dirname(os.path.realpath(__file__))

pyrunner/core/node.py

Lines changed: 3 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def execute(self):
110110
# If it does not extend the Worker class, initialize a reverse-Worker in which the
111111
# worker extends the provided class.
112112
else:
113-
worker = self.generate_worker()(self.context, self._retcode, self.logfile, self.argv)
113+
raise TypeError('{}.{} is not an extension of pyrunner.Worker'.format(self.module, self.worker))
114114

115115
# Launch the "run" method of the provided Worker under a new process.
116116
self._thread = multiprocessing.Process(target=worker.protected_run, daemon=False)
@@ -146,6 +146,7 @@ def poll(self, wait=False):
146146
# causing the thread to block until it's job is complete.
147147
self._thread.join()
148148
self._end_time = time.time()
149+
self._thread = None
149150
if self.retcode > 0:
150151
if self._attempts < self.max_attempts:
151152
logger = lg.FileLogger(self.logfile)
@@ -154,6 +155,7 @@ def poll(self, wait=False):
154155
self._must_wait = True
155156
self._wait_start = time.time()
156157
logger.restart_message(self._attempts)
158+
logger.close()
157159
self._retcode.value = -1
158160
elif (time.time() - self._start_time) >= self._timeout:
159161
self._thread.terminate()
@@ -233,100 +235,6 @@ def get_elapsed_time(self):
233235
else:
234236
return '00:00:00'
235237

236-
# ########################## GENERATE WORKER ########################## #
237-
238-
def generate_worker(self):
239-
"""
240-
* For backwards compatibility with earlier versions. *
241-
242-
Returns a generic Worker object which extends the user-defined parent
243-
class. This is done in order to expose the context, logger, and argv
244-
attributes to the user-defined worker.
245-
"""
246-
parent_class = getattr(importlib.import_module(self.module), self.worker)
247-
248-
class Worker(parent_class):
249-
250-
def __init__(self, context, retcode, logfile, argv):
251-
self._context = context
252-
self._retcode = retcode
253-
self.logfile = logfile
254-
self.logger = lg.FileLogger(logfile).open()
255-
self.argv = argv
256-
return
257-
258-
@property
259-
def context(self):
260-
return getattr(self, '_context', None)
261-
@context.setter
262-
def context(self, value):
263-
self._context = value
264-
return self
265-
266-
@property
267-
def retcode(self):
268-
return self._retcode.value
269-
@retcode.setter
270-
def retcode(self, value):
271-
if int(value) < 0:
272-
raise ValueError('retcode must be 0 or greater - received: {}'.format(value))
273-
self._retcode.value = int(value)
274-
return self
275-
276-
# TODO: Need to deprecate
277-
def set_return_code(self, value):
278-
self.retcode = int(value)
279-
return
280-
281-
def protected_run(self):
282-
'''Initiate worker class run method and additionally trigger methods if defined
283-
for other lifecycle steps.'''
284-
285-
# RUN
286-
try:
287-
self.retcode = super().run() or self.retcode
288-
except Exception as e:
289-
self.logger.error("Uncaught Exception from Worker Thread (RUN)")
290-
self.logger.error(str(e))
291-
self.logger.error(traceback.format_exc())
292-
self.retcode = 901
293-
294-
if not self.retcode:
295-
# ON SUCCESS
296-
if parent_class.__dict__.get('on_success'):
297-
try:
298-
self.retcode = super().on_success() or self.retcode
299-
except Exception as e:
300-
self.logger.error('Uncaught Exception from Worker Thread (ON_SUCCESS)')
301-
self.logger.error(str(e))
302-
self.logger.error(traceback.format_exc())
303-
self.retcode = 902
304-
else:
305-
# ON FAIL
306-
if parent_class.__dict__.get('on_fail'):
307-
try:
308-
self.retcode = super().on_fail() or self.retcode
309-
except Exception as e:
310-
self.logger.error('Uncaught Exception from Worker Thread (ON_FAIL)')
311-
self.logger.error(str(e))
312-
self.logger.error(traceback.format_exc())
313-
self.retcode = 903
314-
315-
# ON EXIT
316-
if parent_class.__dict__.get('on_exit'):
317-
try:
318-
self.retcode = super().on_exit() or self.retcode
319-
except Exception as e:
320-
self.logger.error('Uncaught Exception from Worker Thread (ON_EXIT)')
321-
self.logger.error(str(e))
322-
self.logger.error(traceback.format_exc())
323-
self.retcode = 904
324-
325-
self.logger.close()
326-
327-
return
328-
329-
return Worker
330238

331239
# ########################## SETTERS + GETTERS ########################## #
332240

pyrunner/logger/file.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def close(self):
6969
self.logfile_handle.write("# LOG END - {}\n".format(datetime.now()))
7070
self.logfile_handle.write("############################################################################\n\n")
7171
self.logfile_handle.close()
72+
self.logfile_handle = None
7273
return
7374

7475
def get_file_handle(self):

pyrunner/serde/list.py

Lines changed: 26 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,6 @@ def deserialize(self, proc_file, restart=False):
3535

3636
if not proc_list: raise ValueError('No information read from process list file')
3737

38-
i = 0
39-
header = ''
40-
41-
while not header:
42-
header = proc_list[i].strip()
43-
i += 1
44-
45-
if header[0] != '#':
46-
raise RuntimeError('Missing execution mode header in process list file. Must have at minimum:\n#SHELL\nor\n#PYTHON')
47-
48-
mode = header[1:].split('|')[0]
49-
50-
if mode not in [ constants.MODE_SHELL, constants.MODE_PYTHON ]:
51-
raise RuntimeError('Incorrect execution mode in header: {}'.format(mode))
52-
5338
used_ids = set()
5439

5540
for proc in proc_list:
@@ -85,62 +70,33 @@ def deserialize(self, proc_file, restart=False):
8570

8671
dependencies = [ int(x) for x in sub_details[1].split(',') ]
8772

88-
if mode == constants.MODE_SHELL:
89-
if restart:
90-
register.add_node(
91-
id = id,
92-
dependencies = dependencies,
93-
max_attempts = sub_details[2],
94-
retry_wait_time = sub_details[3],
95-
status = sub_details[4] if sub_details[4] in [ constants.STATUS_COMPLETED, constants.STATUS_NORUN ] else constants.STATUS_PENDING,
96-
name = sub_details[6],
97-
module = 'pyrunner',
98-
worker = 'ShellWorker',
99-
arguments = [sub_details[7]],
100-
logfile = sub_details[8] if len(sub_details) > 8 else None,
101-
named_deps = False
102-
)
103-
else:
104-
register.add_node(
105-
id = id,
106-
dependencies = dependencies,
107-
max_attempts = sub_details[2],
108-
retry_wait_time = sub_details[3],
109-
name = sub_details[4],
110-
module = 'pyrunner',
111-
worker = 'ShellWorker',
112-
arguments = [sub_details[5]],
113-
logfile = sub_details[6] if len(sub_details) > 6 else None,
114-
named_deps = False
115-
)
73+
if restart:
74+
register.add_node(
75+
id = id,
76+
dependencies = dependencies,
77+
max_attempts = sub_details[2],
78+
retry_wait_time = sub_details[3],
79+
status = sub_details[4] if sub_details[4] in [ constants.STATUS_COMPLETED, constants.STATUS_NORUN ] else constants.STATUS_PENDING,
80+
name = sub_details[6],
81+
module = sub_details[7],
82+
worker = sub_details[8],
83+
arguments = [ s.strip('"') if s.strip().startswith('"') and s.strip().endswith('"') else s.strip() for s in comma_pattern.split(sub_details[9])[1::2] ] if len(sub_details) > 9 else None,
84+
logfile = sub_details[10] if len(sub_details) > 10 else None,
85+
named_deps = False
86+
)
11687
else:
117-
if restart:
118-
register.add_node(
119-
id = id,
120-
dependencies = dependencies,
121-
max_attempts = sub_details[2],
122-
retry_wait_time = sub_details[3],
123-
status = sub_details[4] if sub_details[4] in [ constants.STATUS_COMPLETED, constants.STATUS_NORUN ] else constants.STATUS_PENDING,
124-
name = sub_details[6],
125-
module = sub_details[7],
126-
worker = sub_details[8],
127-
arguments = [ s.strip('"') if s.strip().startswith('"') and s.strip().endswith('"') else s.strip() for s in comma_pattern.split(sub_details[9])[1::2] ] if len(sub_details) > 9 else None,
128-
logfile = sub_details[10] if len(sub_details) > 10 else None,
129-
named_deps = False
130-
)
131-
else:
132-
register.add_node(
133-
id = id,
134-
dependencies = dependencies,
135-
max_attempts = sub_details[2],
136-
retry_wait_time = sub_details[3],
137-
name = sub_details[4],
138-
module = sub_details[5],
139-
worker = sub_details[6],
140-
arguments = [ s.strip('"') if s.strip().startswith('"') and s.strip().endswith('"') else s.strip() for s in comma_pattern.split(sub_details[7])[1::2] ] if len(sub_details) > 7 else None,
141-
logfile = sub_details[8] if len(sub_details) > 8 else None,
142-
named_deps = False
143-
)
88+
register.add_node(
89+
id = id,
90+
dependencies = dependencies,
91+
max_attempts = sub_details[2],
92+
retry_wait_time = sub_details[3],
93+
name = sub_details[4],
94+
module = sub_details[5],
95+
worker = sub_details[6],
96+
arguments = [ s.strip('"') if s.strip().startswith('"') and s.strip().endswith('"') else s.strip() for s in comma_pattern.split(sub_details[7])[1::2] ] if len(sub_details) > 7 else None,
97+
logfile = sub_details[8] if len(sub_details) > 8 else None,
98+
named_deps = False
99+
)
144100

145101
return register
146102

pyrunner/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '4.5.0'
1+
__version__ = '5.0.0'

0 commit comments

Comments
 (0)