parallel.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
  2. # For details: https://github.com/PyCQA/pylint/blob/main/LICENSE
  3. import collections
  4. import functools
  5. from typing import Any, DefaultDict, Iterable, List, Tuple
  6. from pylint import reporters
  7. from pylint.lint.utils import _patch_sys_path
  8. from pylint.message import Message
  9. from pylint.typing import FileItem, MessageLocationTuple
  10. from pylint.utils import LinterStats, merge_stats
  11. try:
  12. import multiprocessing
  13. except ImportError:
  14. multiprocessing = None # type: ignore[assignment]
  15. # PyLinter object used by worker processes when checking files using multiprocessing
  16. # should only be used by the worker processes
  17. _worker_linter = None
  18. def _get_new_args(message):
  19. location = (
  20. message.abspath,
  21. message.path,
  22. message.module,
  23. message.obj,
  24. message.line,
  25. message.column,
  26. )
  27. return (message.msg_id, message.symbol, location, message.msg, message.confidence)
  28. def _worker_initialize(linter, arguments=None):
  29. global _worker_linter # pylint: disable=global-statement
  30. _worker_linter = linter
  31. # On the worker process side the messages are just collected and passed back to
  32. # parent process as _worker_check_file function's return value
  33. _worker_linter.set_reporter(reporters.CollectingReporter())
  34. _worker_linter.open()
  35. # Patch sys.path so that each argument is importable just like in single job mode
  36. _patch_sys_path(arguments or ())
  37. def _worker_check_single_file(
  38. file_item: FileItem,
  39. ) -> Tuple[
  40. int, Any, str, Any, List[Tuple[Any, ...]], LinterStats, Any, DefaultDict[Any, List]
  41. ]:
  42. if not _worker_linter:
  43. raise Exception("Worker linter not yet initialised")
  44. _worker_linter.open()
  45. _worker_linter.check_single_file_item(file_item)
  46. mapreduce_data = collections.defaultdict(list)
  47. for checker in _worker_linter.get_checkers():
  48. try:
  49. data = checker.get_map_data()
  50. except AttributeError:
  51. continue
  52. mapreduce_data[checker.name].append(data)
  53. msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
  54. _worker_linter.reporter.reset()
  55. return (
  56. id(multiprocessing.current_process()),
  57. _worker_linter.current_name,
  58. file_item.filepath,
  59. _worker_linter.file_state.base_name,
  60. msgs,
  61. _worker_linter.stats,
  62. _worker_linter.msg_status,
  63. mapreduce_data,
  64. )
  65. def _merge_mapreduce_data(linter, all_mapreduce_data):
  66. """Merges map/reduce data across workers, invoking relevant APIs on checkers"""
  67. # First collate the data, preparing it so we can send it to the checkers for
  68. # validation. The intent here is to collect all the mapreduce data for all checker-
  69. # runs across processes - that will then be passed to a static method on the
  70. # checkers to be reduced and further processed.
  71. collated_map_reduce_data = collections.defaultdict(list)
  72. for linter_data in all_mapreduce_data.values():
  73. for run_data in linter_data:
  74. for checker_name, data in run_data.items():
  75. collated_map_reduce_data[checker_name].extend(data)
  76. # Send the data to checkers that support/require consolidated data
  77. original_checkers = linter.get_checkers()
  78. for checker in original_checkers:
  79. if checker.name in collated_map_reduce_data:
  80. # Assume that if the check has returned map/reduce data that it has the
  81. # reducer function
  82. checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])
  83. def check_parallel(linter, jobs, files: Iterable[FileItem], arguments=None):
  84. """Use the given linter to lint the files with given amount of workers (jobs)
  85. This splits the work filestream-by-filestream. If you need to do work across
  86. multiple files, as in the similarity-checker, then inherit from MapReduceMixin and
  87. implement the map/reduce mixin functionality"""
  88. # The reporter does not need to be passed to worker processes, i.e. the reporter does
  89. original_reporter = linter.reporter
  90. linter.reporter = None
  91. # The linter is inherited by all the pool's workers, i.e. the linter
  92. # is identical to the linter object here. This is required so that
  93. # a custom PyLinter object can be used.
  94. initializer = functools.partial(_worker_initialize, arguments=arguments)
  95. pool = multiprocessing.Pool( # pylint: disable=consider-using-with
  96. jobs, initializer=initializer, initargs=[linter]
  97. )
  98. # ..and now when the workers have inherited the linter, the actual reporter
  99. # can be set back here on the parent process so that results get stored into
  100. # correct reporter
  101. linter.set_reporter(original_reporter)
  102. linter.open()
  103. try:
  104. all_stats = []
  105. all_mapreduce_data = collections.defaultdict(list)
  106. # Maps each file to be worked on by a single _worker_check_single_file() call,
  107. # collecting any map/reduce data by checker module so that we can 'reduce' it
  108. # later.
  109. for (
  110. worker_idx, # used to merge map/reduce data across workers
  111. module,
  112. file_path,
  113. base_name,
  114. messages,
  115. stats,
  116. msg_status,
  117. mapreduce_data,
  118. ) in pool.imap_unordered(_worker_check_single_file, files):
  119. linter.file_state.base_name = base_name
  120. linter.set_current_module(module, file_path)
  121. for msg in messages:
  122. msg = Message(
  123. msg[0], msg[1], MessageLocationTuple(*msg[2]), msg[3], msg[4]
  124. )
  125. linter.reporter.handle_message(msg) # type: ignore[attr-defined] # linter.set_reporter() call above makes linter have a reporter attr
  126. all_stats.append(stats)
  127. all_mapreduce_data[worker_idx].append(mapreduce_data)
  128. linter.msg_status |= msg_status
  129. finally:
  130. pool.close()
  131. pool.join()
  132. _merge_mapreduce_data(linter, all_mapreduce_data)
  133. linter.stats = merge_stats([linter.stats] + all_stats)
  134. # Insert stats data to local checkers.
  135. for checker in linter.get_checkers():
  136. if checker is not linter:
  137. checker.stats = linter.stats