Skip to content

Commit 3f32232

Browse files
committed
Refactor parallel linting
The previous implementation created new PyLinter objects in the worker (child) process causing failure when running under Prospector because Prospector uses a custom PyLinter class (a class inherited from PyLinter) and PyLint naturally just creates PyLinter object. This caused linting to fail because there is options for Prospector's IndentChecker which was not created in the worker process. The new implementation passes the original PyLinter object into workers when the workers are created. See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods The performance has remained about the same based on quick tests done with Django project containing about 30 000 lines of code; with the old implementation linting took 26-28 seconds with 8 jobs on quad core i7 and 24-27 seconds with the new implementation.
1 parent a2fa167 commit 3f32232

File tree

2 files changed

+90
-160
lines changed

2 files changed

+90
-160
lines changed

ChangeLog

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ Release date: TBA
99

1010
* Refactor file checking
1111

12-
Remove code duplication and prepare for supporting parallel linting
13-
under Prospector.
12+
Remove code duplication and allow parallel linting using custom PyLinter
13+
classes.
1414

1515
* Added a new check, ``invalid-overridden-method``
1616

pylint/lint.py

Lines changed: 88 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -243,70 +243,8 @@ def _cpu_count() -> int:
243243
return 1
244244

245245

246-
if multiprocessing is not None:
247-
248-
class ChildLinter(multiprocessing.Process):
249-
def run(self):
250-
# pylint: disable=no-member, unbalanced-tuple-unpacking
251-
tasks_queue, results_queue, self._config = self._args
252-
253-
self._config["jobs"] = 1 # Child does not parallelize any further.
254-
self._python3_porting_mode = self._config.pop("python3_porting_mode", None)
255-
self._plugins = self._config.pop("plugins", None)
256-
257-
# Run linter for received files/modules.
258-
for file_or_module in iter(tasks_queue.get, "STOP"):
259-
try:
260-
result = self._run_linter(file_or_module[0])
261-
results_queue.put(result)
262-
except Exception as ex:
263-
print(
264-
"internal error with sending report for module %s"
265-
% file_or_module,
266-
file=sys.stderr,
267-
)
268-
print(ex, file=sys.stderr)
269-
results_queue.put({})
270-
271-
def _run_linter(self, file_or_module):
272-
linter = PyLinter()
273-
274-
# Register standard checkers.
275-
linter.load_default_plugins()
276-
# Load command line plugins.
277-
if self._plugins:
278-
linter.load_plugin_modules(self._plugins)
279-
280-
linter.load_configuration_from_config(self._config)
281-
282-
# Load plugin specific configuration
283-
linter.load_plugin_configuration()
284-
285-
linter.set_reporter(reporters.CollectingReporter())
286-
287-
# Enable the Python 3 checker mode. This option is
288-
# passed down from the parent linter up to here, since
289-
# the Python 3 porting flag belongs to the Run class,
290-
# instead of the Linter class.
291-
if self._python3_porting_mode:
292-
linter.python3_porting_mode()
293-
294-
# Run the checks.
295-
linter.check(file_or_module)
296-
297-
msgs = [_get_new_args(m) for m in linter.reporter.messages]
298-
return (
299-
file_or_module,
300-
linter.file_state.base_name,
301-
linter.current_name,
302-
msgs,
303-
linter.stats,
304-
linter.msg_status,
305-
)
306-
307-
308246
# pylint: disable=too-many-instance-attributes
309-
class PyLinter(
247+
class PyLinter( # pylint: disable=too-many-public-methods
310248
config.OptionsManagerMixIn,
311249
MessagesHandlerMixIn,
312250
reporters.ReportsHandlerMixIn,
@@ -971,16 +909,20 @@ def should_analyze_file(modname, path, is_argument=False):
971909

972910
# pylint: enable=unused-argument
973911

974-
def check(self, files_or_modules):
975-
"""main checking entry: check a list of files or modules from their
976-
name.
977-
"""
912+
def initialize(self):
978913
# initialize msgs_state now that all messages have been registered into
979914
# the store
980915
for msg in self.msgs_store.messages:
981916
if not msg.may_be_emitted():
982917
self._msgs_state[msg.msgid] = False
983918

919+
def check(self, files_or_modules):
920+
"""main checking entry: check a list of files or modules from their
921+
name.
922+
"""
923+
924+
self.initialize()
925+
984926
if not isinstance(files_or_modules, (list, tuple)):
985927
files_or_modules = (files_or_modules,)
986928

@@ -998,100 +940,21 @@ def check(self, files_or_modules):
998940
elif self.config.jobs == 1:
999941
self._check_files(self.get_ast, self._iterate_file_descrs(files_or_modules))
1000942
else:
1001-
self._parallel_check(files_or_modules)
1002-
1003-
def _get_jobs_config(self):
1004-
child_config = collections.OrderedDict()
1005-
filter_options = {"long-help"}
1006-
filter_options.update((opt_name for opt_name, _ in self._external_opts))
1007-
for opt_providers in self._all_options.values():
1008-
for optname, optdict, val in opt_providers.options_and_values():
1009-
if optdict.get("deprecated"):
1010-
continue
1011-
1012-
if optname not in filter_options:
1013-
child_config[optname] = utils._format_option_value(optdict, val)
1014-
child_config["python3_porting_mode"] = self._python3_porting_mode
1015-
child_config["plugins"] = self._dynamic_plugins
1016-
return child_config
1017-
1018-
def _parallel_task(self, files_or_modules):
1019-
# Prepare configuration for child linters.
1020-
child_config = self._get_jobs_config()
1021-
1022-
children = []
1023-
manager = multiprocessing.Manager()
1024-
tasks_queue = manager.Queue()
1025-
results_queue = manager.Queue()
1026-
1027-
# Send files to child linters.
1028-
expanded_files = []
1029-
for descr in self._expand_files(files_or_modules):
1030-
modname, filepath, is_arg = descr["name"], descr["path"], descr["isarg"]
1031-
if self.should_analyze_file(modname, filepath, is_argument=is_arg):
1032-
expanded_files.append(descr)
1033-
1034-
# do not start more jobs than needed
1035-
for _ in range(min(self.config.jobs, len(expanded_files))):
1036-
child_linter = ChildLinter(args=(tasks_queue, results_queue, child_config))
1037-
child_linter.start()
1038-
children.append(child_linter)
1039-
1040-
for files_or_module in expanded_files:
1041-
path = files_or_module["path"]
1042-
tasks_queue.put([path])
1043-
1044-
# collect results from child linters
1045-
failed = False
1046-
for _ in expanded_files:
1047-
try:
1048-
result = results_queue.get()
1049-
except Exception as ex:
1050-
print(
1051-
"internal error while receiving results from child linter",
1052-
file=sys.stderr,
1053-
)
1054-
print(ex, file=sys.stderr)
1055-
failed = True
1056-
break
1057-
yield result
1058-
1059-
# Stop child linters and wait for their completion.
1060-
for _ in range(self.config.jobs):
1061-
tasks_queue.put("STOP")
1062-
for child in children:
1063-
child.join()
1064-
1065-
if failed:
1066-
print("Error occurred, stopping the linter.", file=sys.stderr)
1067-
sys.exit(32)
1068-
1069-
def _parallel_check(self, files_or_modules):
1070-
# Reset stats.
1071-
self.open()
1072-
1073-
all_stats = []
1074-
module = None
1075-
for result in self._parallel_task(files_or_modules):
1076-
if not result:
1077-
continue
1078-
(_, self.file_state.base_name, module, messages, stats, msg_status) = result
1079-
1080-
for msg in messages:
1081-
msg = Message(*msg)
1082-
self.set_current_module(module)
1083-
self.reporter.handle_message(msg)
943+
check_parallel(
944+
self, self.config.jobs, self._iterate_file_descrs(files_or_modules)
945+
)
1084946

1085-
all_stats.append(stats)
1086-
self.msg_status |= msg_status
947+
def check_single_file(self, name, filepath, modname):
948+
"""Check single file
1087949
1088-
self.stats = _merge_stats(all_stats)
1089-
self.current_name = module
950+
The arguments are the same that are documented in _check_files
1090951
1091-
# Insert stats data to local checkers.
1092-
for checker in self.get_checkers():
1093-
if checker is not self:
1094-
checker.stats = self.stats
952+
The initialize() method should be called before calling this method
953+
"""
954+
with self._astroid_module_checker() as check_astroid_module:
955+
self._check_file(
956+
self.get_ast, check_astroid_module, name, filepath, modname
957+
)
1095958

1096959
def _check_files(self, get_ast, file_descrs):
1097960
"""Check all files from file_descrs
@@ -1313,6 +1176,73 @@ def _report_evaluation(self):
13131176
self.reporter.display_reports(sect)
13141177

13151178

1179+
def check_parallel(linter, jobs, files):
1180+
"""Use the given linter to lint the files with given amount of workers (jobs)
1181+
"""
1182+
original_reporter = linter.reporter
1183+
1184+
# Configure linter on the parent process side for the workers
1185+
linter.set_reporter(reporters.CollectingReporter())
1186+
linter.open()
1187+
1188+
# The linter is inherited by all the pool's workers, i.e. the linter
1189+
# is identical to the linter object here. This is requirde so that
1190+
# a custom PyLinter object (inherited from PyLinter) can be used.
1191+
# See https://github.com/PyCQA/prospector/issues/320
1192+
with multiprocessing.Pool(
1193+
jobs, initializer=_worker_initialize, initargs=[linter]
1194+
) as pool:
1195+
# ..and now when the workers have inherited the linter, the actual reporter
1196+
# can be set back here on the parent process so that results get stored into
1197+
# correct reporter
1198+
linter.set_reporter(original_reporter)
1199+
1200+
all_stats = []
1201+
1202+
for module, messages, stats, msg_status in pool.imap_unordered(
1203+
_worker_check_single_file, files
1204+
):
1205+
linter.set_current_module(module)
1206+
for msg in messages:
1207+
msg = Message(*msg)
1208+
linter.reporter.handle_message(msg)
1209+
1210+
all_stats.append(stats)
1211+
linter.msg_status |= msg_status
1212+
1213+
linter.stats = _merge_stats(all_stats)
1214+
1215+
# Insert stats data to local checkers.
1216+
for checker in linter.get_checkers():
1217+
if checker is not linter:
1218+
checker.stats = linter.stats
1219+
1220+
1221+
# PyLinter object used by worker processes when checking files using multiprocessing
1222+
# should only be used by the worker processes
1223+
_worker_linter = None
1224+
1225+
1226+
def _worker_initialize(linter):
1227+
global _worker_linter # pylint: disable=global-statement
1228+
_worker_linter = linter
1229+
1230+
1231+
def _worker_check_single_file(file_item):
1232+
name, filepath, modname = file_item
1233+
1234+
_worker_linter.open()
1235+
_worker_linter.check_single_file(name, filepath, modname)
1236+
1237+
msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
1238+
return (
1239+
_worker_linter.current_name,
1240+
msgs,
1241+
_worker_linter.stats,
1242+
_worker_linter.msg_status,
1243+
)
1244+
1245+
13161246
# some reporting functions ####################################################
13171247

13181248

0 commit comments

Comments
 (0)