async.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # Copyright (c) 2015-2018, 2020 Claudiu Popa <pcmanticore@gmail.com>
  2. # Copyright (c) 2017 Derek Gustafson <degustaf@gmail.com>
  3. # Copyright (c) 2019, 2021 Pierre Sassoulas <pierre.sassoulas@gmail.com>
  4. # Copyright (c) 2020 hippo91 <guillaume.peillex@gmail.com>
  5. # Copyright (c) 2021 Daniël van Noord <13665637+DanielNoord@users.noreply.github.com>
  6. # Copyright (c) 2021 Marc Mueller <30130371+cdce8p@users.noreply.github.com>
  7. # Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
  8. # For details: https://github.com/PyCQA/pylint/blob/main/LICENSE
  9. """Checker for anything related to the async protocol (PEP 492)."""
  10. import sys
  11. import astroid
  12. from astroid import nodes
  13. from pylint import checkers, interfaces, utils
  14. from pylint.checkers import utils as checker_utils
  15. from pylint.checkers.utils import decorated_with
  16. class AsyncChecker(checkers.BaseChecker):
  17. __implements__ = interfaces.IAstroidChecker
  18. name = "async"
  19. msgs = {
  20. "E1700": (
  21. "Yield inside async function",
  22. "yield-inside-async-function",
  23. "Used when an `yield` or `yield from` statement is "
  24. "found inside an async function.",
  25. {"minversion": (3, 5)},
  26. ),
  27. "E1701": (
  28. "Async context manager '%s' doesn't implement __aenter__ and __aexit__.",
  29. "not-async-context-manager",
  30. "Used when an async context manager is used with an object "
  31. "that does not implement the async context management protocol.",
  32. {"minversion": (3, 5)},
  33. ),
  34. }
  35. def open(self):
  36. self._ignore_mixin_members = utils.get_global_option(
  37. self, "ignore-mixin-members"
  38. )
  39. self._mixin_class_rgx = utils.get_global_option(self, "mixin-class-rgx")
  40. self._async_generators = ["contextlib.asynccontextmanager"]
  41. @checker_utils.check_messages("yield-inside-async-function")
  42. def visit_asyncfunctiondef(self, node: nodes.AsyncFunctionDef) -> None:
  43. for child in node.nodes_of_class(nodes.Yield):
  44. if child.scope() is node and (
  45. sys.version_info[:2] == (3, 5) or isinstance(child, nodes.YieldFrom)
  46. ):
  47. self.add_message("yield-inside-async-function", node=child)
  48. @checker_utils.check_messages("not-async-context-manager")
  49. def visit_asyncwith(self, node: nodes.AsyncWith) -> None:
  50. for ctx_mgr, _ in node.items:
  51. inferred = checker_utils.safe_infer(ctx_mgr)
  52. if inferred is None or inferred is astroid.Uninferable:
  53. continue
  54. if isinstance(inferred, nodes.AsyncFunctionDef):
  55. # Check if we are dealing with a function decorated
  56. # with contextlib.asynccontextmanager.
  57. if decorated_with(inferred, self._async_generators):
  58. continue
  59. elif isinstance(inferred, astroid.bases.AsyncGenerator):
  60. # Check if we are dealing with a function decorated
  61. # with contextlib.asynccontextmanager.
  62. if decorated_with(inferred.parent, self._async_generators):
  63. continue
  64. else:
  65. try:
  66. inferred.getattr("__aenter__")
  67. inferred.getattr("__aexit__")
  68. except astroid.exceptions.NotFoundError:
  69. if isinstance(inferred, astroid.Instance):
  70. # If we do not know the bases of this class,
  71. # just skip it.
  72. if not checker_utils.has_known_bases(inferred):
  73. continue
  74. # Ignore mixin classes if they match the rgx option.
  75. if self._ignore_mixin_members and self._mixin_class_rgx.match(
  76. inferred.name
  77. ):
  78. continue
  79. else:
  80. continue
  81. self.add_message(
  82. "not-async-context-manager", node=node, args=(inferred.name,)
  83. )
  84. def register(linter):
  85. """required method to auto register this checker"""
  86. linter.register_checker(AsyncChecker(linter))