pytestplugin.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. try:
  2. # installed by bootstrap.py
  3. import sqla_plugin_base as plugin_base
  4. except ImportError:
  5. # assume we're a package, use traditional import
  6. from . import plugin_base
  7. import argparse
  8. import collections
  9. from functools import update_wrapper
  10. import inspect
  11. import itertools
  12. import operator
  13. import os
  14. import re
  15. import sys
  16. import pytest
  17. try:
  18. import xdist # noqa
  19. has_xdist = True
  20. except ImportError:
  21. has_xdist = False
  22. py2k = sys.version_info < (3, 0)
  23. if py2k:
  24. try:
  25. import sqla_reinvent_fixtures as reinvent_fixtures_py2k
  26. except ImportError:
  27. from . import reinvent_fixtures_py2k
  28. def pytest_addoption(parser):
  29. group = parser.getgroup("sqlalchemy")
  30. def make_option(name, **kw):
  31. callback_ = kw.pop("callback", None)
  32. if callback_:
  33. class CallableAction(argparse.Action):
  34. def __call__(
  35. self, parser, namespace, values, option_string=None
  36. ):
  37. callback_(option_string, values, parser)
  38. kw["action"] = CallableAction
  39. zeroarg_callback = kw.pop("zeroarg_callback", None)
  40. if zeroarg_callback:
  41. class CallableAction(argparse.Action):
  42. def __init__(
  43. self,
  44. option_strings,
  45. dest,
  46. default=False,
  47. required=False,
  48. help=None, # noqa
  49. ):
  50. super(CallableAction, self).__init__(
  51. option_strings=option_strings,
  52. dest=dest,
  53. nargs=0,
  54. const=True,
  55. default=default,
  56. required=required,
  57. help=help,
  58. )
  59. def __call__(
  60. self, parser, namespace, values, option_string=None
  61. ):
  62. zeroarg_callback(option_string, values, parser)
  63. kw["action"] = CallableAction
  64. group.addoption(name, **kw)
  65. plugin_base.setup_options(make_option)
  66. plugin_base.read_config()
  67. def pytest_configure(config):
  68. if hasattr(config, "workerinput"):
  69. plugin_base.restore_important_follower_config(config.workerinput)
  70. plugin_base.configure_follower(config.workerinput["follower_ident"])
  71. else:
  72. if config.option.write_idents and os.path.exists(
  73. config.option.write_idents
  74. ):
  75. os.remove(config.option.write_idents)
  76. plugin_base.pre_begin(config.option)
  77. plugin_base.set_coverage_flag(
  78. bool(getattr(config.option, "cov_source", False))
  79. )
  80. plugin_base.set_fixture_functions(PytestFixtureFunctions)
  81. if config.option.dump_pyannotate:
  82. global DUMP_PYANNOTATE
  83. DUMP_PYANNOTATE = True
  84. DUMP_PYANNOTATE = False
  85. @pytest.fixture(autouse=True)
  86. def collect_types_fixture():
  87. if DUMP_PYANNOTATE:
  88. from pyannotate_runtime import collect_types
  89. collect_types.start()
  90. yield
  91. if DUMP_PYANNOTATE:
  92. collect_types.stop()
  93. def pytest_sessionstart(session):
  94. from sqlalchemy.testing import asyncio
  95. asyncio._assume_async(plugin_base.post_begin)
  96. def pytest_sessionfinish(session):
  97. from sqlalchemy.testing import asyncio
  98. asyncio._maybe_async_provisioning(plugin_base.final_process_cleanup)
  99. if session.config.option.dump_pyannotate:
  100. from pyannotate_runtime import collect_types
  101. collect_types.dump_stats(session.config.option.dump_pyannotate)
  102. def pytest_collection_finish(session):
  103. if session.config.option.dump_pyannotate:
  104. from pyannotate_runtime import collect_types
  105. lib_sqlalchemy = os.path.abspath("lib/sqlalchemy")
  106. def _filter(filename):
  107. filename = os.path.normpath(os.path.abspath(filename))
  108. if "lib/sqlalchemy" not in os.path.commonpath(
  109. [filename, lib_sqlalchemy]
  110. ):
  111. return None
  112. if "testing" in filename:
  113. return None
  114. return filename
  115. collect_types.init_types_collection(filter_filename=_filter)
  116. if has_xdist:
  117. import uuid
  118. def pytest_configure_node(node):
  119. from sqlalchemy.testing import provision
  120. from sqlalchemy.testing import asyncio
  121. # the master for each node fills workerinput dictionary
  122. # which pytest-xdist will transfer to the subprocess
  123. plugin_base.memoize_important_follower_config(node.workerinput)
  124. node.workerinput["follower_ident"] = "test_%s" % uuid.uuid4().hex[0:12]
  125. asyncio._maybe_async_provisioning(
  126. provision.create_follower_db, node.workerinput["follower_ident"]
  127. )
  128. def pytest_testnodedown(node, error):
  129. from sqlalchemy.testing import provision
  130. from sqlalchemy.testing import asyncio
  131. asyncio._maybe_async_provisioning(
  132. provision.drop_follower_db, node.workerinput["follower_ident"]
  133. )
  134. def pytest_collection_modifyitems(session, config, items):
  135. # look for all those classes that specify __backend__ and
  136. # expand them out into per-database test cases.
  137. # this is much easier to do within pytest_pycollect_makeitem, however
  138. # pytest is iterating through cls.__dict__ as makeitem is
  139. # called which causes a "dictionary changed size" error on py3k.
  140. # I'd submit a pullreq for them to turn it into a list first, but
  141. # it's to suit the rather odd use case here which is that we are adding
  142. # new classes to a module on the fly.
  143. from sqlalchemy.testing import asyncio
  144. rebuilt_items = collections.defaultdict(
  145. lambda: collections.defaultdict(list)
  146. )
  147. items[:] = [
  148. item
  149. for item in items
  150. if item.getparent(pytest.Class) is not None
  151. and not item.getparent(pytest.Class).name.startswith("_")
  152. ]
  153. test_classes = set(item.getparent(pytest.Class) for item in items)
  154. def collect(element):
  155. for inst_or_fn in element.collect():
  156. if isinstance(inst_or_fn, pytest.Collector):
  157. # no yield from in 2.7
  158. for el in collect(inst_or_fn):
  159. yield el
  160. else:
  161. yield inst_or_fn
  162. def setup_test_classes():
  163. for test_class in test_classes:
  164. for sub_cls in plugin_base.generate_sub_tests(
  165. test_class.cls, test_class.module
  166. ):
  167. if sub_cls is not test_class.cls:
  168. per_cls_dict = rebuilt_items[test_class.cls]
  169. # support pytest 5.4.0 and above pytest.Class.from_parent
  170. ctor = getattr(pytest.Class, "from_parent", pytest.Class)
  171. module = test_class.getparent(pytest.Module)
  172. for fn in collect(
  173. ctor(name=sub_cls.__name__, parent=module)
  174. ):
  175. per_cls_dict[fn.name].append(fn)
  176. # class requirements will sometimes need to access the DB to check
  177. # capabilities, so need to do this for async
  178. asyncio._maybe_async_provisioning(setup_test_classes)
  179. newitems = []
  180. for item in items:
  181. cls_ = item.cls
  182. if cls_ in rebuilt_items:
  183. newitems.extend(rebuilt_items[cls_][item.name])
  184. else:
  185. newitems.append(item)
  186. if py2k:
  187. for item in newitems:
  188. reinvent_fixtures_py2k.scan_for_fixtures_to_use_for_class(item)
  189. # seems like the functions attached to a test class aren't sorted already?
  190. # is that true and why's that? (when using unittest, they're sorted)
  191. items[:] = sorted(
  192. newitems,
  193. key=lambda item: (
  194. item.getparent(pytest.Module).name,
  195. item.getparent(pytest.Class).name,
  196. item.name,
  197. ),
  198. )
  199. def pytest_pycollect_makeitem(collector, name, obj):
  200. if inspect.isclass(obj) and plugin_base.want_class(name, obj):
  201. from sqlalchemy.testing import config
  202. if config.any_async:
  203. obj = _apply_maybe_async(obj)
  204. ctor = getattr(pytest.Class, "from_parent", pytest.Class)
  205. return [
  206. ctor(name=parametrize_cls.__name__, parent=collector)
  207. for parametrize_cls in _parametrize_cls(collector.module, obj)
  208. ]
  209. elif (
  210. inspect.isfunction(obj)
  211. and collector.cls is not None
  212. and plugin_base.want_method(collector.cls, obj)
  213. ):
  214. # None means, fall back to default logic, which includes
  215. # method-level parametrize
  216. return None
  217. else:
  218. # empty list means skip this item
  219. return []
  220. def _is_wrapped_coroutine_function(fn):
  221. while hasattr(fn, "__wrapped__"):
  222. fn = fn.__wrapped__
  223. return inspect.iscoroutinefunction(fn)
  224. def _apply_maybe_async(obj, recurse=True):
  225. from sqlalchemy.testing import asyncio
  226. for name, value in vars(obj).items():
  227. if (
  228. (callable(value) or isinstance(value, classmethod))
  229. and not getattr(value, "_maybe_async_applied", False)
  230. and (name.startswith("test_"))
  231. and not _is_wrapped_coroutine_function(value)
  232. ):
  233. is_classmethod = False
  234. if isinstance(value, classmethod):
  235. value = value.__func__
  236. is_classmethod = True
  237. @_pytest_fn_decorator
  238. def make_async(fn, *args, **kwargs):
  239. return asyncio._maybe_async(fn, *args, **kwargs)
  240. do_async = make_async(value)
  241. if is_classmethod:
  242. do_async = classmethod(do_async)
  243. do_async._maybe_async_applied = True
  244. setattr(obj, name, do_async)
  245. if recurse:
  246. for cls in obj.mro()[1:]:
  247. if cls != object:
  248. _apply_maybe_async(cls, False)
  249. return obj
  250. def _parametrize_cls(module, cls):
  251. """implement a class-based version of pytest parametrize."""
  252. if "_sa_parametrize" not in cls.__dict__:
  253. return [cls]
  254. _sa_parametrize = cls._sa_parametrize
  255. classes = []
  256. for full_param_set in itertools.product(
  257. *[params for argname, params in _sa_parametrize]
  258. ):
  259. cls_variables = {}
  260. for argname, param in zip(
  261. [_sa_param[0] for _sa_param in _sa_parametrize], full_param_set
  262. ):
  263. if not argname:
  264. raise TypeError("need argnames for class-based combinations")
  265. argname_split = re.split(r",\s*", argname)
  266. for arg, val in zip(argname_split, param.values):
  267. cls_variables[arg] = val
  268. parametrized_name = "_".join(
  269. # token is a string, but in py2k pytest is giving us a unicode,
  270. # so call str() on it.
  271. str(re.sub(r"\W", "", token))
  272. for param in full_param_set
  273. for token in param.id.split("-")
  274. )
  275. name = "%s_%s" % (cls.__name__, parametrized_name)
  276. newcls = type.__new__(type, name, (cls,), cls_variables)
  277. setattr(module, name, newcls)
  278. classes.append(newcls)
  279. return classes
  280. _current_class = None
  281. def pytest_runtest_setup(item):
  282. from sqlalchemy.testing import asyncio
  283. # pytest_runtest_setup runs *before* pytest fixtures with scope="class".
  284. # plugin_base.start_test_class_outside_fixtures may opt to raise SkipTest
  285. # for the whole class and has to run things that are across all current
  286. # databases, so we run this outside of the pytest fixture system altogether
  287. # and ensure asyncio greenlet if any engines are async
  288. global _current_class
  289. if isinstance(item, pytest.Function) and _current_class is None:
  290. asyncio._maybe_async_provisioning(
  291. plugin_base.start_test_class_outside_fixtures,
  292. item.cls,
  293. )
  294. _current_class = item.getparent(pytest.Class)
  295. @pytest.hookimpl(hookwrapper=True)
  296. def pytest_runtest_teardown(item, nextitem):
  297. # runs inside of pytest function fixture scope
  298. # after test function runs
  299. from sqlalchemy.testing import asyncio
  300. from sqlalchemy.util import string_types
  301. asyncio._maybe_async(plugin_base.after_test, item)
  302. yield
  303. # this is now after all the fixture teardown have run, the class can be
  304. # finalized. Since pytest v7 this finalizer can no longer be added in
  305. # pytest_runtest_setup since the class has not yet been setup at that
  306. # time.
  307. # See https://github.com/pytest-dev/pytest/issues/9343
  308. global _current_class, _current_report
  309. if _current_class is not None and (
  310. # last test or a new class
  311. nextitem is None
  312. or nextitem.getparent(pytest.Class) is not _current_class
  313. ):
  314. _current_class = None
  315. try:
  316. asyncio._maybe_async_provisioning(
  317. plugin_base.stop_test_class_outside_fixtures, item.cls
  318. )
  319. except Exception as e:
  320. # in case of an exception during teardown attach the original
  321. # error to the exception message, otherwise it will get lost
  322. if _current_report.failed:
  323. if not e.args:
  324. e.args = (
  325. "__Original test failure__:\n"
  326. + _current_report.longreprtext,
  327. )
  328. elif e.args[-1] and isinstance(e.args[-1], string_types):
  329. args = list(e.args)
  330. args[-1] += (
  331. "\n__Original test failure__:\n"
  332. + _current_report.longreprtext
  333. )
  334. e.args = tuple(args)
  335. else:
  336. e.args += (
  337. "__Original test failure__",
  338. _current_report.longreprtext,
  339. )
  340. raise
  341. finally:
  342. _current_report = None
  343. def pytest_runtest_call(item):
  344. # runs inside of pytest function fixture scope
  345. # before test function runs
  346. from sqlalchemy.testing import asyncio
  347. asyncio._maybe_async(
  348. plugin_base.before_test,
  349. item,
  350. item.module.__name__,
  351. item.cls,
  352. item.name,
  353. )
  354. _current_report = None
  355. def pytest_runtest_logreport(report):
  356. global _current_report
  357. if report.when == "call":
  358. _current_report = report
  359. @pytest.fixture(scope="class")
  360. def setup_class_methods(request):
  361. from sqlalchemy.testing import asyncio
  362. cls = request.cls
  363. if hasattr(cls, "setup_test_class"):
  364. asyncio._maybe_async(cls.setup_test_class)
  365. if py2k:
  366. reinvent_fixtures_py2k.run_class_fixture_setup(request)
  367. yield
  368. if py2k:
  369. reinvent_fixtures_py2k.run_class_fixture_teardown(request)
  370. if hasattr(cls, "teardown_test_class"):
  371. asyncio._maybe_async(cls.teardown_test_class)
  372. asyncio._maybe_async(plugin_base.stop_test_class, cls)
  373. @pytest.fixture(scope="function")
  374. def setup_test_methods(request):
  375. from sqlalchemy.testing import asyncio
  376. # called for each test
  377. self = request.instance
  378. # before this fixture runs:
  379. # 1. function level "autouse" fixtures under py3k (examples: TablesTest
  380. # define tables / data, MappedTest define tables / mappers / data)
  381. # 2. run homegrown function level "autouse" fixtures under py2k
  382. if py2k:
  383. reinvent_fixtures_py2k.run_fn_fixture_setup(request)
  384. # 3. run outer xdist-style setup
  385. if hasattr(self, "setup_test"):
  386. asyncio._maybe_async(self.setup_test)
  387. # alembic test suite is using setUp and tearDown
  388. # xdist methods; support these in the test suite
  389. # for the near term
  390. if hasattr(self, "setUp"):
  391. asyncio._maybe_async(self.setUp)
  392. # inside the yield:
  393. # 4. function level fixtures defined on test functions themselves,
  394. # e.g. "connection", "metadata" run next
  395. # 5. pytest hook pytest_runtest_call then runs
  396. # 6. test itself runs
  397. yield
  398. # yield finishes:
  399. # 7. function level fixtures defined on test functions
  400. # themselves, e.g. "connection" rolls back the transaction, "metadata"
  401. # emits drop all
  402. # 8. pytest hook pytest_runtest_teardown hook runs, this is associated
  403. # with fixtures close all sessions, provisioning.stop_test_class(),
  404. # engines.testing_reaper -> ensure all connection pool connections
  405. # are returned, engines created by testing_engine that aren't the
  406. # config engine are disposed
  407. asyncio._maybe_async(plugin_base.after_test_fixtures, self)
  408. # 10. run xdist-style teardown
  409. if hasattr(self, "tearDown"):
  410. asyncio._maybe_async(self.tearDown)
  411. if hasattr(self, "teardown_test"):
  412. asyncio._maybe_async(self.teardown_test)
  413. # 11. run homegrown function-level "autouse" fixtures under py2k
  414. if py2k:
  415. reinvent_fixtures_py2k.run_fn_fixture_teardown(request)
  416. # 12. function level "autouse" fixtures under py3k (examples: TablesTest /
  417. # MappedTest delete table data, possibly drop tables and clear mappers
  418. # depending on the flags defined by the test class)
  419. def getargspec(fn):
  420. if sys.version_info.major == 3:
  421. return inspect.getfullargspec(fn)
  422. else:
  423. return inspect.getargspec(fn)
  424. def _pytest_fn_decorator(target):
  425. """Port of langhelpers.decorator with pytest-specific tricks."""
  426. from sqlalchemy.util.langhelpers import format_argspec_plus
  427. from sqlalchemy.util.compat import inspect_getfullargspec
  428. def _exec_code_in_env(code, env, fn_name):
  429. exec(code, env)
  430. return env[fn_name]
  431. def decorate(fn, add_positional_parameters=()):
  432. spec = inspect_getfullargspec(fn)
  433. if add_positional_parameters:
  434. spec.args.extend(add_positional_parameters)
  435. metadata = dict(
  436. __target_fn="__target_fn", __orig_fn="__orig_fn", name=fn.__name__
  437. )
  438. metadata.update(format_argspec_plus(spec, grouped=False))
  439. code = (
  440. """\
  441. def %(name)s(%(args)s):
  442. return %(__target_fn)s(%(__orig_fn)s, %(apply_kw)s)
  443. """
  444. % metadata
  445. )
  446. decorated = _exec_code_in_env(
  447. code, {"__target_fn": target, "__orig_fn": fn}, fn.__name__
  448. )
  449. if not add_positional_parameters:
  450. decorated.__defaults__ = getattr(fn, "__func__", fn).__defaults__
  451. decorated.__wrapped__ = fn
  452. return update_wrapper(decorated, fn)
  453. else:
  454. # this is the pytest hacky part. don't do a full update wrapper
  455. # because pytest is really being sneaky about finding the args
  456. # for the wrapped function
  457. decorated.__module__ = fn.__module__
  458. decorated.__name__ = fn.__name__
  459. if hasattr(fn, "pytestmark"):
  460. decorated.pytestmark = fn.pytestmark
  461. return decorated
  462. return decorate
  463. class PytestFixtureFunctions(plugin_base.FixtureFunctions):
  464. def skip_test_exception(self, *arg, **kw):
  465. return pytest.skip.Exception(*arg, **kw)
  466. def mark_base_test_class(self):
  467. return pytest.mark.usefixtures(
  468. "setup_class_methods", "setup_test_methods"
  469. )
  470. _combination_id_fns = {
  471. "i": lambda obj: obj,
  472. "r": repr,
  473. "s": str,
  474. "n": lambda obj: obj.__name__
  475. if hasattr(obj, "__name__")
  476. else type(obj).__name__,
  477. }
  478. def combinations(self, *arg_sets, **kw):
  479. """Facade for pytest.mark.parametrize.
  480. Automatically derives argument names from the callable which in our
  481. case is always a method on a class with positional arguments.
  482. ids for parameter sets are derived using an optional template.
  483. """
  484. from sqlalchemy.testing import exclusions
  485. if sys.version_info.major == 3:
  486. if len(arg_sets) == 1 and hasattr(arg_sets[0], "__next__"):
  487. arg_sets = list(arg_sets[0])
  488. else:
  489. if len(arg_sets) == 1 and hasattr(arg_sets[0], "next"):
  490. arg_sets = list(arg_sets[0])
  491. argnames = kw.pop("argnames", None)
  492. def _filter_exclusions(args):
  493. result = []
  494. gathered_exclusions = []
  495. for a in args:
  496. if isinstance(a, exclusions.compound):
  497. gathered_exclusions.append(a)
  498. else:
  499. result.append(a)
  500. return result, gathered_exclusions
  501. id_ = kw.pop("id_", None)
  502. tobuild_pytest_params = []
  503. has_exclusions = False
  504. if id_:
  505. _combination_id_fns = self._combination_id_fns
  506. # because itemgetter is not consistent for one argument vs.
  507. # multiple, make it multiple in all cases and use a slice
  508. # to omit the first argument
  509. _arg_getter = operator.itemgetter(
  510. 0,
  511. *[
  512. idx
  513. for idx, char in enumerate(id_)
  514. if char in ("n", "r", "s", "a")
  515. ]
  516. )
  517. fns = [
  518. (operator.itemgetter(idx), _combination_id_fns[char])
  519. for idx, char in enumerate(id_)
  520. if char in _combination_id_fns
  521. ]
  522. for arg in arg_sets:
  523. if not isinstance(arg, tuple):
  524. arg = (arg,)
  525. fn_params, param_exclusions = _filter_exclusions(arg)
  526. parameters = _arg_getter(fn_params)[1:]
  527. if param_exclusions:
  528. has_exclusions = True
  529. tobuild_pytest_params.append(
  530. (
  531. parameters,
  532. param_exclusions,
  533. "-".join(
  534. comb_fn(getter(arg)) for getter, comb_fn in fns
  535. ),
  536. )
  537. )
  538. else:
  539. for arg in arg_sets:
  540. if not isinstance(arg, tuple):
  541. arg = (arg,)
  542. fn_params, param_exclusions = _filter_exclusions(arg)
  543. if param_exclusions:
  544. has_exclusions = True
  545. tobuild_pytest_params.append(
  546. (fn_params, param_exclusions, None)
  547. )
  548. pytest_params = []
  549. for parameters, param_exclusions, id_ in tobuild_pytest_params:
  550. if has_exclusions:
  551. parameters += (param_exclusions,)
  552. param = pytest.param(*parameters, id=id_)
  553. pytest_params.append(param)
  554. def decorate(fn):
  555. if inspect.isclass(fn):
  556. if has_exclusions:
  557. raise NotImplementedError(
  558. "exclusions not supported for class level combinations"
  559. )
  560. if "_sa_parametrize" not in fn.__dict__:
  561. fn._sa_parametrize = []
  562. fn._sa_parametrize.append((argnames, pytest_params))
  563. return fn
  564. else:
  565. if argnames is None:
  566. _argnames = getargspec(fn).args[1:]
  567. else:
  568. _argnames = re.split(r", *", argnames)
  569. if has_exclusions:
  570. _argnames += ["_exclusions"]
  571. @_pytest_fn_decorator
  572. def check_exclusions(fn, *args, **kw):
  573. _exclusions = args[-1]
  574. if _exclusions:
  575. exlu = exclusions.compound().add(*_exclusions)
  576. fn = exlu(fn)
  577. return fn(*args[0:-1], **kw)
  578. def process_metadata(spec):
  579. spec.args.append("_exclusions")
  580. fn = check_exclusions(
  581. fn, add_positional_parameters=("_exclusions",)
  582. )
  583. return pytest.mark.parametrize(_argnames, pytest_params)(fn)
  584. return decorate
  585. def param_ident(self, *parameters):
  586. ident = parameters[0]
  587. return pytest.param(*parameters[1:], id=ident)
  588. def fixture(self, *arg, **kw):
  589. from sqlalchemy.testing import config
  590. from sqlalchemy.testing import asyncio
  591. # wrapping pytest.fixture function. determine if
  592. # decorator was called as @fixture or @fixture().
  593. if len(arg) > 0 and callable(arg[0]):
  594. # was called as @fixture(), we have the function to wrap.
  595. fn = arg[0]
  596. arg = arg[1:]
  597. else:
  598. # was called as @fixture, don't have the function yet.
  599. fn = None
  600. # create a pytest.fixture marker. because the fn is not being
  601. # passed, this is always a pytest.FixtureFunctionMarker()
  602. # object (or whatever pytest is calling it when you read this)
  603. # that is waiting for a function.
  604. fixture = pytest.fixture(*arg, **kw)
  605. # now apply wrappers to the function, including fixture itself
  606. def wrap(fn):
  607. if config.any_async:
  608. fn = asyncio._maybe_async_wrapper(fn)
  609. # other wrappers may be added here
  610. if py2k and "autouse" in kw:
  611. # py2k workaround for too-slow collection of autouse fixtures
  612. # in pytest 4.6.11. See notes in reinvent_fixtures_py2k for
  613. # rationale.
  614. # comment this condition out in order to disable the
  615. # py2k workaround entirely.
  616. reinvent_fixtures_py2k.add_fixture(fn, fixture)
  617. else:
  618. # now apply FixtureFunctionMarker
  619. fn = fixture(fn)
  620. return fn
  621. if fn:
  622. return wrap(fn)
  623. else:
  624. return wrap
  625. def get_current_test_name(self):
  626. return os.environ.get("PYTEST_CURRENT_TEST")
  627. def async_test(self, fn):
  628. from sqlalchemy.testing import asyncio
  629. @_pytest_fn_decorator
  630. def decorate(fn, *args, **kwargs):
  631. asyncio._run_coroutine_function(fn, *args, **kwargs)
  632. return decorate(fn)