123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789 |
- # plugin/plugin_base.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- """Testing extensions.
- this module is designed to work as a testing-framework-agnostic library,
- created so that multiple test frameworks can be supported at once
- (mostly so that we can migrate to new ones). The current target
- is pytest.
- """
- from __future__ import absolute_import
- import abc
- import logging
- import re
- import sys
- # flag which indicates we are in the SQLAlchemy testing suite,
- # and not that of Alembic or a third party dialect.
- bootstrapped_as_sqlalchemy = False
- log = logging.getLogger("sqlalchemy.testing.plugin_base")
- py3k = sys.version_info >= (3, 0)
- if py3k:
- import configparser
- ABC = abc.ABC
- else:
- import ConfigParser as configparser
- import collections as collections_abc # noqa
- class ABC(object):
- __metaclass__ = abc.ABCMeta
- # late imports
- fixtures = None
- engines = None
- exclusions = None
- warnings = None
- profiling = None
- provision = None
- assertions = None
- requirements = None
- config = None
- testing = None
- util = None
- file_config = None
- logging = None
- include_tags = set()
- exclude_tags = set()
- options = None
- def setup_options(make_option):
- make_option(
- "--log-info",
- action="callback",
- type=str,
- callback=_log,
- help="turn on info logging for <LOG> (multiple OK)",
- )
- make_option(
- "--log-debug",
- action="callback",
- type=str,
- callback=_log,
- help="turn on debug logging for <LOG> (multiple OK)",
- )
- make_option(
- "--db",
- action="append",
- type=str,
- dest="db",
- help="Use prefab database uri. Multiple OK, "
- "first one is run by default.",
- )
- make_option(
- "--dbs",
- action="callback",
- zeroarg_callback=_list_dbs,
- help="List available prefab dbs",
- )
- make_option(
- "--dburi",
- action="append",
- type=str,
- dest="dburi",
- help="Database uri. Multiple OK, " "first one is run by default.",
- )
- make_option(
- "--dbdriver",
- action="append",
- type=str,
- dest="dbdriver",
- help="Additional database drivers to include in tests. "
- "These are linked to the existing database URLs by the "
- "provisioning system.",
- )
- make_option(
- "--dropfirst",
- action="store_true",
- dest="dropfirst",
- help="Drop all tables in the target database first",
- )
- make_option(
- "--disable-asyncio",
- action="store_true",
- help="disable test / fixtures / provisoning running in asyncio",
- )
- make_option(
- "--backend-only",
- action="store_true",
- dest="backend_only",
- help="Run only tests marked with __backend__ or __sparse_backend__",
- )
- make_option(
- "--nomemory",
- action="store_true",
- dest="nomemory",
- help="Don't run memory profiling tests",
- )
- make_option(
- "--notimingintensive",
- action="store_true",
- dest="notimingintensive",
- help="Don't run timing intensive tests",
- )
- make_option(
- "--profile-sort",
- type=str,
- default="cumulative",
- dest="profilesort",
- help="Type of sort for profiling standard output",
- )
- make_option(
- "--profile-dump",
- type=str,
- dest="profiledump",
- help="Filename where a single profile run will be dumped",
- )
- make_option(
- "--postgresql-templatedb",
- type=str,
- help="name of template database to use for PostgreSQL "
- "CREATE DATABASE (defaults to current database)",
- )
- make_option(
- "--low-connections",
- action="store_true",
- dest="low_connections",
- help="Use a low number of distinct connections - "
- "i.e. for Oracle TNS",
- )
- make_option(
- "--write-idents",
- type=str,
- dest="write_idents",
- help="write out generated follower idents to <file>, "
- "when -n<num> is used",
- )
- make_option(
- "--reversetop",
- action="store_true",
- dest="reversetop",
- default=False,
- help="Use a random-ordering set implementation in the ORM "
- "(helps reveal dependency issues)",
- )
- make_option(
- "--requirements",
- action="callback",
- type=str,
- callback=_requirements_opt,
- help="requirements class for testing, overrides setup.cfg",
- )
- make_option(
- "--with-cdecimal",
- action="store_true",
- dest="cdecimal",
- default=False,
- help="Monkeypatch the cdecimal library into Python 'decimal' "
- "for all tests",
- )
- make_option(
- "--include-tag",
- action="callback",
- callback=_include_tag,
- type=str,
- help="Include tests with tag <tag>",
- )
- make_option(
- "--exclude-tag",
- action="callback",
- callback=_exclude_tag,
- type=str,
- help="Exclude tests with tag <tag>",
- )
- make_option(
- "--write-profiles",
- action="store_true",
- dest="write_profiles",
- default=False,
- help="Write/update failing profiling data.",
- )
- make_option(
- "--force-write-profiles",
- action="store_true",
- dest="force_write_profiles",
- default=False,
- help="Unconditionally write/update profiling data.",
- )
- make_option(
- "--dump-pyannotate",
- type=str,
- dest="dump_pyannotate",
- help="Run pyannotate and dump json info to given file",
- )
- make_option(
- "--mypy-extra-test-path",
- type=str,
- action="append",
- default=[],
- dest="mypy_extra_test_paths",
- help="Additional test directories to add to the mypy tests. "
- "This is used only when running mypy tests. Multiple OK",
- )
- def configure_follower(follower_ident):
- """Configure required state for a follower.
- This invokes in the parent process and typically includes
- database creation.
- """
- from sqlalchemy.testing import provision
- provision.FOLLOWER_IDENT = follower_ident
- def memoize_important_follower_config(dict_):
- """Store important configuration we will need to send to a follower.
- This invokes in the parent process after normal config is set up.
- This is necessary as pytest seems to not be using forking, so we
- start with nothing in memory, *but* it isn't running our argparse
- callables, so we have to just copy all of that over.
- """
- dict_["memoized_config"] = {
- "include_tags": include_tags,
- "exclude_tags": exclude_tags,
- }
- def restore_important_follower_config(dict_):
- """Restore important configuration needed by a follower.
- This invokes in the follower process.
- """
- global include_tags, exclude_tags
- include_tags.update(dict_["memoized_config"]["include_tags"])
- exclude_tags.update(dict_["memoized_config"]["exclude_tags"])
- def read_config():
- global file_config
- file_config = configparser.ConfigParser()
- file_config.read(["setup.cfg", "test.cfg"])
- def pre_begin(opt):
- """things to set up early, before coverage might be setup."""
- global options
- options = opt
- for fn in pre_configure:
- fn(options, file_config)
- def set_coverage_flag(value):
- options.has_coverage = value
- def post_begin():
- """things to set up later, once we know coverage is running."""
- # Lazy setup of other options (post coverage)
- for fn in post_configure:
- fn(options, file_config)
- # late imports, has to happen after config.
- global util, fixtures, engines, exclusions, assertions, provision
- global warnings, profiling, config, testing
- from sqlalchemy import testing # noqa
- from sqlalchemy.testing import fixtures, engines, exclusions # noqa
- from sqlalchemy.testing import assertions, warnings, profiling # noqa
- from sqlalchemy.testing import config, provision # noqa
- from sqlalchemy import util # noqa
- warnings.setup_filters()
- def _log(opt_str, value, parser):
- global logging
- if not logging:
- import logging
- logging.basicConfig()
- if opt_str.endswith("-info"):
- logging.getLogger(value).setLevel(logging.INFO)
- elif opt_str.endswith("-debug"):
- logging.getLogger(value).setLevel(logging.DEBUG)
- def _list_dbs(*args):
- print("Available --db options (use --dburi to override)")
- for macro in sorted(file_config.options("db")):
- print("%20s\t%s" % (macro, file_config.get("db", macro)))
- sys.exit(0)
- def _requirements_opt(opt_str, value, parser):
- _setup_requirements(value)
- def _exclude_tag(opt_str, value, parser):
- exclude_tags.add(value.replace("-", "_"))
- def _include_tag(opt_str, value, parser):
- include_tags.add(value.replace("-", "_"))
- pre_configure = []
- post_configure = []
- def pre(fn):
- pre_configure.append(fn)
- return fn
- def post(fn):
- post_configure.append(fn)
- return fn
- @pre
- def _setup_options(opt, file_config):
- global options
- options = opt
- @pre
- def _set_nomemory(opt, file_config):
- if opt.nomemory:
- exclude_tags.add("memory_intensive")
- @pre
- def _set_notimingintensive(opt, file_config):
- if opt.notimingintensive:
- exclude_tags.add("timing_intensive")
- @pre
- def _monkeypatch_cdecimal(options, file_config):
- if options.cdecimal:
- import cdecimal
- sys.modules["decimal"] = cdecimal
- @post
- def _init_symbols(options, file_config):
- from sqlalchemy.testing import config
- config._fixture_functions = _fixture_fn_class()
- @post
- def _set_disable_asyncio(opt, file_config):
- if opt.disable_asyncio or not py3k:
- from sqlalchemy.testing import asyncio
- asyncio.ENABLE_ASYNCIO = False
- @post
- def _engine_uri(options, file_config):
- from sqlalchemy import testing
- from sqlalchemy.testing import config
- from sqlalchemy.testing import provision
- if options.dburi:
- db_urls = list(options.dburi)
- else:
- db_urls = []
- extra_drivers = options.dbdriver or []
- if options.db:
- for db_token in options.db:
- for db in re.split(r"[,\s]+", db_token):
- if db not in file_config.options("db"):
- raise RuntimeError(
- "Unknown URI specifier '%s'. "
- "Specify --dbs for known uris." % db
- )
- else:
- db_urls.append(file_config.get("db", db))
- if not db_urls:
- db_urls.append(file_config.get("db", "default"))
- config._current = None
- expanded_urls = list(provision.generate_db_urls(db_urls, extra_drivers))
- for db_url in expanded_urls:
- log.info("Adding database URL: %s", db_url)
- if options.write_idents and provision.FOLLOWER_IDENT:
- with open(options.write_idents, "a") as file_:
- file_.write(provision.FOLLOWER_IDENT + " " + db_url + "\n")
- cfg = provision.setup_config(
- db_url, options, file_config, provision.FOLLOWER_IDENT
- )
- if not config._current:
- cfg.set_as_current(cfg, testing)
- @post
- def _requirements(options, file_config):
- requirement_cls = file_config.get("sqla_testing", "requirement_cls")
- _setup_requirements(requirement_cls)
- def _setup_requirements(argument):
- from sqlalchemy.testing import config
- from sqlalchemy import testing
- if config.requirements is not None:
- return
- modname, clsname = argument.split(":")
- # importlib.import_module() only introduced in 2.7, a little
- # late
- mod = __import__(modname)
- for component in modname.split(".")[1:]:
- mod = getattr(mod, component)
- req_cls = getattr(mod, clsname)
- config.requirements = testing.requires = req_cls()
- config.bootstrapped_as_sqlalchemy = bootstrapped_as_sqlalchemy
- @post
- def _prep_testing_database(options, file_config):
- from sqlalchemy.testing import config
- if options.dropfirst:
- from sqlalchemy.testing import provision
- for cfg in config.Config.all_configs():
- provision.drop_all_schema_objects(cfg, cfg.db)
- @post
- def _reverse_topological(options, file_config):
- if options.reversetop:
- from sqlalchemy.orm.util import randomize_unitofwork
- randomize_unitofwork()
- @post
- def _post_setup_options(opt, file_config):
- from sqlalchemy.testing import config
- config.options = options
- config.file_config = file_config
- @post
- def _setup_profiling(options, file_config):
- from sqlalchemy.testing import profiling
- profiling._profile_stats = profiling.ProfileStatsFile(
- file_config.get("sqla_testing", "profile_file"),
- sort=options.profilesort,
- dump=options.profiledump,
- )
- def want_class(name, cls):
- if not issubclass(cls, fixtures.TestBase):
- return False
- elif name.startswith("_"):
- return False
- elif (
- config.options.backend_only
- and not getattr(cls, "__backend__", False)
- and not getattr(cls, "__sparse_backend__", False)
- and not getattr(cls, "__only_on__", False)
- ):
- return False
- else:
- return True
- def want_method(cls, fn):
- if not fn.__name__.startswith("test_"):
- return False
- elif fn.__module__ is None:
- return False
- elif include_tags:
- return (
- hasattr(cls, "__tags__")
- and exclusions.tags(cls.__tags__).include_test(
- include_tags, exclude_tags
- )
- ) or (
- hasattr(fn, "_sa_exclusion_extend")
- and fn._sa_exclusion_extend.include_test(
- include_tags, exclude_tags
- )
- )
- elif exclude_tags and hasattr(cls, "__tags__"):
- return exclusions.tags(cls.__tags__).include_test(
- include_tags, exclude_tags
- )
- elif exclude_tags and hasattr(fn, "_sa_exclusion_extend"):
- return fn._sa_exclusion_extend.include_test(include_tags, exclude_tags)
- else:
- return True
- def generate_sub_tests(cls, module):
- if getattr(cls, "__backend__", False) or getattr(
- cls, "__sparse_backend__", False
- ):
- sparse = getattr(cls, "__sparse_backend__", False)
- for cfg in _possible_configs_for_cls(cls, sparse=sparse):
- orig_name = cls.__name__
- # we can have special chars in these names except for the
- # pytest junit plugin, which is tripped up by the brackets
- # and periods, so sanitize
- alpha_name = re.sub(r"[_\[\]\.]+", "_", cfg.name)
- alpha_name = re.sub(r"_+$", "", alpha_name)
- name = "%s_%s" % (cls.__name__, alpha_name)
- subcls = type(
- name,
- (cls,),
- {"_sa_orig_cls_name": orig_name, "__only_on_config__": cfg},
- )
- setattr(module, name, subcls)
- yield subcls
- else:
- yield cls
- def start_test_class_outside_fixtures(cls):
- _do_skips(cls)
- _setup_engine(cls)
- def stop_test_class(cls):
- # close sessions, immediate connections, etc.
- fixtures.stop_test_class_inside_fixtures(cls)
- # close outstanding connection pool connections, dispose of
- # additional engines
- engines.testing_reaper.stop_test_class_inside_fixtures()
- def stop_test_class_outside_fixtures(cls):
- engines.testing_reaper.stop_test_class_outside_fixtures()
- provision.stop_test_class_outside_fixtures(config, config.db, cls)
- try:
- if not options.low_connections:
- assertions.global_cleanup_assertions()
- finally:
- _restore_engine()
- def _restore_engine():
- if config._current:
- config._current.reset(testing)
- def final_process_cleanup():
- engines.testing_reaper.final_cleanup()
- assertions.global_cleanup_assertions()
- _restore_engine()
- def _setup_engine(cls):
- if getattr(cls, "__engine_options__", None):
- opts = dict(cls.__engine_options__)
- opts["scope"] = "class"
- eng = engines.testing_engine(options=opts)
- config._current.push_engine(eng, testing)
- def before_test(test, test_module_name, test_class, test_name):
- # format looks like:
- # "test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause"
- name = getattr(test_class, "_sa_orig_cls_name", test_class.__name__)
- id_ = "%s.%s.%s" % (test_module_name, name, test_name)
- profiling._start_current_test(id_)
- def after_test(test):
- fixtures.after_test()
- engines.testing_reaper.after_test()
- def after_test_fixtures(test):
- engines.testing_reaper.after_test_outside_fixtures(test)
- def _possible_configs_for_cls(cls, reasons=None, sparse=False):
- all_configs = set(config.Config.all_configs())
- if cls.__unsupported_on__:
- spec = exclusions.db_spec(*cls.__unsupported_on__)
- for config_obj in list(all_configs):
- if spec(config_obj):
- all_configs.remove(config_obj)
- if getattr(cls, "__only_on__", None):
- spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
- for config_obj in list(all_configs):
- if not spec(config_obj):
- all_configs.remove(config_obj)
- if getattr(cls, "__only_on_config__", None):
- all_configs.intersection_update([cls.__only_on_config__])
- if hasattr(cls, "__requires__"):
- requirements = config.requirements
- for config_obj in list(all_configs):
- for requirement in cls.__requires__:
- check = getattr(requirements, requirement)
- skip_reasons = check.matching_config_reasons(config_obj)
- if skip_reasons:
- all_configs.remove(config_obj)
- if reasons is not None:
- reasons.extend(skip_reasons)
- break
- if hasattr(cls, "__prefer_requires__"):
- non_preferred = set()
- requirements = config.requirements
- for config_obj in list(all_configs):
- for requirement in cls.__prefer_requires__:
- check = getattr(requirements, requirement)
- if not check.enabled_for_config(config_obj):
- non_preferred.add(config_obj)
- if all_configs.difference(non_preferred):
- all_configs.difference_update(non_preferred)
- if sparse:
- # pick only one config from each base dialect
- # sorted so we get the same backend each time selecting the highest
- # server version info.
- per_dialect = {}
- for cfg in reversed(
- sorted(
- all_configs,
- key=lambda cfg: (
- cfg.db.name,
- cfg.db.driver,
- cfg.db.dialect.server_version_info,
- ),
- )
- ):
- db = cfg.db.name
- if db not in per_dialect:
- per_dialect[db] = cfg
- return per_dialect.values()
- return all_configs
- def _do_skips(cls):
- reasons = []
- all_configs = _possible_configs_for_cls(cls, reasons)
- if getattr(cls, "__skip_if__", False):
- for c in getattr(cls, "__skip_if__"):
- if c():
- config.skip_test(
- "'%s' skipped by %s" % (cls.__name__, c.__name__)
- )
- if not all_configs:
- msg = "'%s' unsupported on any DB implementation %s%s" % (
- cls.__name__,
- ", ".join(
- "'%s(%s)+%s'"
- % (
- config_obj.db.name,
- ".".join(
- str(dig)
- for dig in exclusions._server_version(config_obj.db)
- ),
- config_obj.db.driver,
- )
- for config_obj in config.Config.all_configs()
- ),
- ", ".join(reasons),
- )
- config.skip_test(msg)
- elif hasattr(cls, "__prefer_backends__"):
- non_preferred = set()
- spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__))
- for config_obj in all_configs:
- if not spec(config_obj):
- non_preferred.add(config_obj)
- if all_configs.difference(non_preferred):
- all_configs.difference_update(non_preferred)
- if config._current not in all_configs:
- _setup_config(all_configs.pop(), cls)
- def _setup_config(config_obj, ctx):
- config._current.push(config_obj, testing)
- class FixtureFunctions(ABC):
- @abc.abstractmethod
- def skip_test_exception(self, *arg, **kw):
- raise NotImplementedError()
- @abc.abstractmethod
- def combinations(self, *args, **kw):
- raise NotImplementedError()
- @abc.abstractmethod
- def param_ident(self, *args, **kw):
- raise NotImplementedError()
- @abc.abstractmethod
- def fixture(self, *arg, **kw):
- raise NotImplementedError()
- def get_current_test_name(self):
- raise NotImplementedError()
- @abc.abstractmethod
- def mark_base_test_class(self):
- raise NotImplementedError()
- _fixture_fn_class = None
- def set_fixture_functions(fixture_fn_class):
- global _fixture_fn_class
- _fixture_fn_class = fixture_fn_class
|