Skip to content

Commit cd21a1d

Browse files
committed
mapreduce| Fixes -jN for map/reduce Checkers (e.g. SimilarChecker)
This integrate the map/reduce functionality into lint.check_process(). We previously had `map` being invoked, here we add `reduce` support. We do this by collecting the map-data by worker and then passing it to a reducer function on the Checker object, if available - determined by whether they confirm to the `mapreduce_checker.MapReduceMixin` mixin interface or nor. This allows Checker objects to function across file-streams when using multiprocessing/-j2+. For example SimilarChecker needs to be able to compare data across all files. The tests, that we also add here, check that a Checker instance returns and reports expected data and errors, such as error-messages and stats - at least in a exit-ok (0) situation. On a personal note, as we are copying more data across process boundaries, I suspect that the memory implications of this might cause issues for large projects already running with -jN and duplicate code detection on. That said, given that it takes a long time to perform lints of large code bases that is an issue for the [near?] future and likely to be part of the performance work. Either way but let's get it working first and deal with memory and perforamnce considerations later - I say this as there are many quick wins we can make here, e.g. file-batching, hashing lines, data compression and so on.
1 parent 796b293 commit cd21a1d

File tree

7 files changed

+96
-23
lines changed

7 files changed

+96
-23
lines changed

pylint/checkers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# Copyright (c) 2018-2019 Pierre Sassoulas <[email protected]>
1111
# Copyright (c) 2018 ssolanki <[email protected]>
1212
# Copyright (c) 2019 Bruno P. Kinoshita <[email protected]>
13+
# Copyright (c) 2020 Frank Harrison <[email protected]>
1314

1415
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
1516
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
@@ -43,6 +44,7 @@
4344
"""
4445

4546
from pylint.checkers.base_checker import BaseChecker, BaseTokenChecker
47+
from pylint.checkers.mapreduce_checker import MapReduceMixin
4648
from pylint.utils import register_plugins
4749

4850

pylint/checkers/mapreduce_checker.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Copyright (c) 2020 Frank Harrison <[email protected]>
2+
3+
# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
4+
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
5+
import abc
6+
7+
8+
class MapReduceMixin(metaclass=abc.ABCMeta):
9+
""" A mixin design to allow multiprocess/threaded runs of a Checker """
10+
11+
@abc.abstractmethod
12+
def get_map_data(self):
13+
""" Returns mergable/reducible data that will be examined """
14+
15+
@classmethod
16+
@abc.abstractmethod
17+
def reduce_map_data(cls, linter, data):
18+
""" For a given Checker, receives data for all mapped runs """

pylint/checkers/similar.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
import astroid
3333

34-
from pylint.checkers import BaseChecker, table_lines_from_stats
34+
from pylint.checkers import BaseChecker, MapReduceMixin, table_lines_from_stats
3535
from pylint.interfaces import IRawChecker
3636
from pylint.reporters.ureports.nodes import Table
3737
from pylint.utils import decoding_stream
@@ -161,17 +161,17 @@ def _iter_sims(self):
161161
yield from self._find_common(lineset, lineset2)
162162

163163
def get_map_data(self):
164-
""" Returns the data we can use for a map/reduce process
164+
"""Returns the data we can use for a map/reduce process
165165
166166
In this case we are returning this instance's Linesets, that is all file
167167
information that will later be used for vectorisation.
168168
"""
169169
return self.linesets
170170

171171
def combine_mapreduce_data(self, linesets_collection):
172-
""" Reduces and recombines data into a format that we can report on
172+
"""Reduces and recombines data into a format that we can report on
173173
174-
The partner function of get_map_data() """
174+
The partner function of get_map_data()"""
175175
self.linesets = [line for lineset in linesets_collection for line in lineset]
176176

177177

@@ -302,7 +302,7 @@ def report_similarities(sect, stats, old_stats):
302302

303303

304304
# wrapper to get a pylint checker from the similar class
305-
class SimilarChecker(BaseChecker, Similar):
305+
class SimilarChecker(BaseChecker, Similar, MapReduceMixin):
306306
"""checks for similarities and duplicated code. This computation may be
307307
memory / CPU intensive, so you should disable it if you experiment some
308308
problems.
@@ -422,9 +422,9 @@ def get_map_data(self):
422422

423423
@classmethod
424424
def reduce_map_data(cls, linter, data):
425-
""" Reduces and recombines data into a format that we can report on
425+
"""Reduces and recombines data into a format that we can report on
426426
427-
The partner function of get_map_data() """
427+
The partner function of get_map_data()"""
428428
recombined = SimilarChecker(linter)
429429
recombined.open()
430430
Similar.combine_mapreduce_data(recombined, linesets_collection=data)

pylint/lint/parallel.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,28 +67,59 @@ def _worker_check_single_file(file_item):
6767

6868
_worker_linter.open()
6969
_worker_linter.check_single_file(name, filepath, modname)
70-
70+
mapreduce_data = collections.defaultdict(list)
71+
for checker in _worker_linter.get_checkers():
72+
try:
73+
data = checker.get_map_data()
74+
except AttributeError:
75+
continue
76+
mapreduce_data[checker.name].append(data)
7177
msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
7278
_worker_linter.reporter.reset()
7379
return (
80+
id(multiprocessing.current_process()),
7481
_worker_linter.current_name,
7582
filepath,
7683
_worker_linter.file_state.base_name,
7784
msgs,
7885
_worker_linter.stats,
7986
_worker_linter.msg_status,
87+
mapreduce_data,
8088
)
8189

8290

91+
def _merge_mapreduce_data(linter, all_mapreduce_data):
92+
""" Merges map/reduce data across workers, invoking relevant APIs on checkers """
93+
# First collate the data, preparing it so we can send it to the checkers for
94+
# validation. The intent here is to collect all the mapreduce data for all checker-
95+
# runs across processes - that will then be passed to a static method on the
96+
# checkers to be reduced and further processed.
97+
collated_map_reduce_data = collections.defaultdict(list)
98+
for linter_data in all_mapreduce_data.values():
99+
for run_data in linter_data:
100+
for checker_name, data in run_data.items():
101+
collated_map_reduce_data[checker_name].extend(data)
102+
103+
# Send the data to checkers that support/require consolidated data
104+
original_checkers = linter.get_checkers()
105+
for checker in original_checkers:
106+
if checker.name in collated_map_reduce_data:
107+
# Assume that if the check has returned map/reduce data that it has the
108+
# reducer function
109+
checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])
110+
111+
83112
def check_parallel(linter, jobs, files, arguments=None):
84-
"""Use the given linter to lint the files with given amount of workers (jobs)"""
85-
# The reporter does not need to be passed to worker processess, i.e. the reporter does
86-
# not need to be pickleable
113+
"""Use the given linter to lint the files with given amount of workers (jobs)
114+
This splits the work filestream-by-filestream. If you need to do work across
115+
multiple files, as in the similarity-checker, then inherit from MapReduceMixin and
116+
implement the map/reduce mixin functionality"""
117+
# The reporter does not need to be passed to worker processes, i.e. the reporter does
87118
original_reporter = linter.reporter
88119
linter.reporter = None
89120

90121
# The linter is inherited by all the pool's workers, i.e. the linter
91-
# is identical to the linter object here. This is requred so that
122+
# is identical to the linter object here. This is required so that
92123
# a custom PyLinter object can be used.
93124
initializer = functools.partial(_worker_initialize, arguments=arguments)
94125
with multiprocessing.Pool(jobs, initializer=initializer, initargs=[linter]) as pool:
@@ -99,14 +130,20 @@ def check_parallel(linter, jobs, files, arguments=None):
99130
linter.open()
100131

101132
all_stats = []
133+
all_mapreduce_data = collections.defaultdict(list)
102134

135+
# Maps each file to be worked on by a single _worker_check_single_file() call,
136+
# collecting any map/reduce data by checker module so that we can 'reduce' it
137+
# later.
103138
for (
139+
worker_idx, # used to merge map/reduce data across workers
104140
module,
105141
file_path,
106142
base_name,
107143
messages,
108144
stats,
109145
msg_status,
146+
mapreduce_data,
110147
) in pool.imap_unordered(_worker_check_single_file, files):
111148
linter.file_state.base_name = base_name
112149
linter.set_current_module(module, file_path)
@@ -115,8 +152,9 @@ def check_parallel(linter, jobs, files, arguments=None):
115152
linter.reporter.handle_message(msg)
116153

117154
all_stats.append(stats)
155+
all_mapreduce_data[worker_idx].append(mapreduce_data)
118156
linter.msg_status |= msg_status
119-
157+
_merge_mapreduce_data(linter, all_mapreduce_data)
120158
linter.stats = _merge_stats(all_stats)
121159

122160
# Insert stats data to local checkers.

tests/checkers/unittest_similar.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,7 @@ def test_no_args():
239239

240240

241241
def test_get_map_data():
242-
""" Tests that a SimilarChecker respects the MapReduceMixin interface
243-
"""
242+
"""Tests that a SimilarChecker respects the MapReduceMixin interface"""
244243
linter = PyLinter(reporter=Reporter())
245244

246245
# Add a parallel checker to ensure it can map and reduce

tests/test_check_parallel.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,17 @@ def test_worker_check_single_file_uninitialised(self):
103103
def test_worker_check_single_file_no_checkers(self):
104104
linter = PyLinter(reporter=Reporter())
105105
worker_initialize(linter=linter)
106-
(name, _, _, msgs, stats, msg_status) = worker_check_single_file(
107-
_gen_file_data()
108-
)
106+
107+
(
108+
_, # proc-id
109+
name,
110+
_, # file_path
111+
_, # base_name
112+
msgs,
113+
stats,
114+
msg_status,
115+
_, # mapreduce_data
116+
) = worker_check_single_file(_gen_file_data())
109117
assert name == "--test-file_data-name-0--"
110118
assert [] == msgs
111119
no_errors_status = 0
@@ -140,9 +148,16 @@ def test_worker_check_sequential_checker(self):
140148
# Add the only checker we care about in this test
141149
linter.register_checker(SequentialTestChecker(linter))
142150

143-
(name, _, _, msgs, stats, msg_status) = worker_check_single_file(
144-
_gen_file_data()
145-
)
151+
(
152+
_, # proc-id
153+
name,
154+
_, # file_path
155+
_, # base_name
156+
msgs,
157+
stats,
158+
msg_status,
159+
_, # mapreduce_data
160+
) = worker_check_single_file(_gen_file_data())
146161

147162
# Ensure we return the same data as the single_file_no_checkers test
148163
assert name == "--test-file_data-name-0--"

tests/test_self.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
import pytest
4848

49-
from pylint.constants import MAIN_CHECKER_NAME
49+
from pylint.constants import MAIN_CHECKER_NAME, MSG_TYPES_STATUS
5050
from pylint.lint import Run
5151
from pylint.reporters import JSONReporter
5252
from pylint.reporters.text import BaseReporter, ColorizedTextReporter, TextReporter
@@ -249,7 +249,8 @@ def test_parallel_execution(self):
249249
join(HERE, "functional", "a", "arguments.py"),
250250
join(HERE, "functional", "a", "arguments.py"),
251251
],
252-
code=2,
252+
# We expect similarities to fail and an error
253+
code=MSG_TYPES_STATUS["R"] | MSG_TYPES_STATUS["E"],
253254
)
254255

255256
def test_parallel_execution_missing_arguments(self):

0 commit comments

Comments
 (0)