123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- import random
- from . import testing
- from .. import config
- from .. import fixtures
- from .. import util
- from ..assertions import eq_
- from ..assertions import is_false
- from ..assertions import is_true
- from ..config import requirements
- from ..schema import Table
- from ... import CheckConstraint
- from ... import Column
- from ... import ForeignKeyConstraint
- from ... import Index
- from ... import inspect
- from ... import Integer
- from ... import schema
- from ... import String
- from ... import UniqueConstraint
- class TableDDLTest(fixtures.TestBase):
- __backend__ = True
- def _simple_fixture(self, schema=None):
- return Table(
- "test_table",
- self.metadata,
- Column("id", Integer, primary_key=True, autoincrement=False),
- Column("data", String(50)),
- schema=schema,
- )
- def _underscore_fixture(self):
- return Table(
- "_test_table",
- self.metadata,
- Column("id", Integer, primary_key=True, autoincrement=False),
- Column("_data", String(50)),
- )
- def _table_index_fixture(self, schema=None):
- table = self._simple_fixture(schema=schema)
- idx = Index("test_index", table.c.data)
- return table, idx
- def _simple_roundtrip(self, table):
- with config.db.begin() as conn:
- conn.execute(table.insert().values((1, "some data")))
- result = conn.execute(table.select())
- eq_(result.first(), (1, "some data"))
- @requirements.create_table
- @util.provide_metadata
- def test_create_table(self):
- table = self._simple_fixture()
- table.create(config.db, checkfirst=False)
- self._simple_roundtrip(table)
- @requirements.create_table
- @requirements.schemas
- @util.provide_metadata
- def test_create_table_schema(self):
- table = self._simple_fixture(schema=config.test_schema)
- table.create(config.db, checkfirst=False)
- self._simple_roundtrip(table)
- @requirements.drop_table
- @util.provide_metadata
- def test_drop_table(self):
- table = self._simple_fixture()
- table.create(config.db, checkfirst=False)
- table.drop(config.db, checkfirst=False)
- @requirements.create_table
- @util.provide_metadata
- def test_underscore_names(self):
- table = self._underscore_fixture()
- table.create(config.db, checkfirst=False)
- self._simple_roundtrip(table)
- @requirements.comment_reflection
- @util.provide_metadata
- def test_add_table_comment(self, connection):
- table = self._simple_fixture()
- table.create(connection, checkfirst=False)
- table.comment = "a comment"
- connection.execute(schema.SetTableComment(table))
- eq_(
- inspect(connection).get_table_comment("test_table"),
- {"text": "a comment"},
- )
- @requirements.comment_reflection
- @util.provide_metadata
- def test_drop_table_comment(self, connection):
- table = self._simple_fixture()
- table.create(connection, checkfirst=False)
- table.comment = "a comment"
- connection.execute(schema.SetTableComment(table))
- connection.execute(schema.DropTableComment(table))
- eq_(
- inspect(connection).get_table_comment("test_table"), {"text": None}
- )
- @requirements.table_ddl_if_exists
- @util.provide_metadata
- def test_create_table_if_not_exists(self, connection):
- table = self._simple_fixture()
- connection.execute(schema.CreateTable(table, if_not_exists=True))
- is_true(inspect(connection).has_table("test_table"))
- connection.execute(schema.CreateTable(table, if_not_exists=True))
- @requirements.index_ddl_if_exists
- @util.provide_metadata
- def test_create_index_if_not_exists(self, connection):
- table, idx = self._table_index_fixture()
- connection.execute(schema.CreateTable(table, if_not_exists=True))
- is_true(inspect(connection).has_table("test_table"))
- is_false(
- "test_index"
- in [
- ix["name"]
- for ix in inspect(connection).get_indexes("test_table")
- ]
- )
- connection.execute(schema.CreateIndex(idx, if_not_exists=True))
- is_true(
- "test_index"
- in [
- ix["name"]
- for ix in inspect(connection).get_indexes("test_table")
- ]
- )
- connection.execute(schema.CreateIndex(idx, if_not_exists=True))
- @requirements.table_ddl_if_exists
- @util.provide_metadata
- def test_drop_table_if_exists(self, connection):
- table = self._simple_fixture()
- table.create(connection)
- is_true(inspect(connection).has_table("test_table"))
- connection.execute(schema.DropTable(table, if_exists=True))
- is_false(inspect(connection).has_table("test_table"))
- connection.execute(schema.DropTable(table, if_exists=True))
- @requirements.index_ddl_if_exists
- @util.provide_metadata
- def test_drop_index_if_exists(self, connection):
- table, idx = self._table_index_fixture()
- table.create(connection)
- is_true(
- "test_index"
- in [
- ix["name"]
- for ix in inspect(connection).get_indexes("test_table")
- ]
- )
- connection.execute(schema.DropIndex(idx, if_exists=True))
- is_false(
- "test_index"
- in [
- ix["name"]
- for ix in inspect(connection).get_indexes("test_table")
- ]
- )
- connection.execute(schema.DropIndex(idx, if_exists=True))
- class FutureTableDDLTest(fixtures.FutureEngineMixin, TableDDLTest):
- pass
- class LongNameBlowoutTest(fixtures.TestBase):
- """test the creation of a variety of DDL structures and ensure
- label length limits pass on backends
- """
- __backend__ = True
- def fk(self, metadata, connection):
- convention = {
- "fk": "foreign_key_%(table_name)s_"
- "%(column_0_N_name)s_"
- "%(referred_table_name)s_"
- + (
- "_".join(
- "".join(random.choice("abcdef") for j in range(20))
- for i in range(10)
- )
- ),
- }
- metadata.naming_convention = convention
- Table(
- "a_things_with_stuff",
- metadata,
- Column("id_long_column_name", Integer, primary_key=True),
- test_needs_fk=True,
- )
- cons = ForeignKeyConstraint(
- ["aid"], ["a_things_with_stuff.id_long_column_name"]
- )
- Table(
- "b_related_things_of_value",
- metadata,
- Column(
- "aid",
- ),
- cons,
- test_needs_fk=True,
- )
- actual_name = cons.name
- metadata.create_all(connection)
- if testing.requires.foreign_key_constraint_name_reflection.enabled:
- insp = inspect(connection)
- fks = insp.get_foreign_keys("b_related_things_of_value")
- reflected_name = fks[0]["name"]
- return actual_name, reflected_name
- else:
- return actual_name, None
- def pk(self, metadata, connection):
- convention = {
- "pk": "primary_key_%(table_name)s_"
- "%(column_0_N_name)s"
- + (
- "_".join(
- "".join(random.choice("abcdef") for j in range(30))
- for i in range(10)
- )
- ),
- }
- metadata.naming_convention = convention
- a = Table(
- "a_things_with_stuff",
- metadata,
- Column("id_long_column_name", Integer, primary_key=True),
- Column("id_another_long_name", Integer, primary_key=True),
- )
- cons = a.primary_key
- actual_name = cons.name
- metadata.create_all(connection)
- insp = inspect(connection)
- pk = insp.get_pk_constraint("a_things_with_stuff")
- reflected_name = pk["name"]
- return actual_name, reflected_name
- def ix(self, metadata, connection):
- convention = {
- "ix": "index_%(table_name)s_"
- "%(column_0_N_name)s"
- + (
- "_".join(
- "".join(random.choice("abcdef") for j in range(30))
- for i in range(10)
- )
- ),
- }
- metadata.naming_convention = convention
- a = Table(
- "a_things_with_stuff",
- metadata,
- Column("id_long_column_name", Integer, primary_key=True),
- Column("id_another_long_name", Integer),
- )
- cons = Index(None, a.c.id_long_column_name, a.c.id_another_long_name)
- actual_name = cons.name
- metadata.create_all(connection)
- insp = inspect(connection)
- ix = insp.get_indexes("a_things_with_stuff")
- reflected_name = ix[0]["name"]
- return actual_name, reflected_name
- def uq(self, metadata, connection):
- convention = {
- "uq": "unique_constraint_%(table_name)s_"
- "%(column_0_N_name)s"
- + (
- "_".join(
- "".join(random.choice("abcdef") for j in range(30))
- for i in range(10)
- )
- ),
- }
- metadata.naming_convention = convention
- cons = UniqueConstraint("id_long_column_name", "id_another_long_name")
- Table(
- "a_things_with_stuff",
- metadata,
- Column("id_long_column_name", Integer, primary_key=True),
- Column("id_another_long_name", Integer),
- cons,
- )
- actual_name = cons.name
- metadata.create_all(connection)
- insp = inspect(connection)
- uq = insp.get_unique_constraints("a_things_with_stuff")
- reflected_name = uq[0]["name"]
- return actual_name, reflected_name
- def ck(self, metadata, connection):
- convention = {
- "ck": "check_constraint_%(table_name)s"
- + (
- "_".join(
- "".join(random.choice("abcdef") for j in range(30))
- for i in range(10)
- )
- ),
- }
- metadata.naming_convention = convention
- cons = CheckConstraint("some_long_column_name > 5")
- Table(
- "a_things_with_stuff",
- metadata,
- Column("id_long_column_name", Integer, primary_key=True),
- Column("some_long_column_name", Integer),
- cons,
- )
- actual_name = cons.name
- metadata.create_all(connection)
- insp = inspect(connection)
- ck = insp.get_check_constraints("a_things_with_stuff")
- reflected_name = ck[0]["name"]
- return actual_name, reflected_name
- @testing.combinations(
- ("fk",),
- ("pk",),
- ("ix",),
- ("ck", testing.requires.check_constraint_reflection.as_skips()),
- ("uq", testing.requires.unique_constraint_reflection.as_skips()),
- argnames="type_",
- )
- def test_long_convention_name(self, type_, metadata, connection):
- actual_name, reflected_name = getattr(self, type_)(
- metadata, connection
- )
- assert len(actual_name) > 255
- if reflected_name is not None:
- overlap = actual_name[0 : len(reflected_name)]
- if len(overlap) < len(actual_name):
- eq_(overlap[0:-5], reflected_name[0 : len(overlap) - 5])
- else:
- eq_(overlap, reflected_name)
- __all__ = ("TableDDLTest", "FutureTableDDLTest", "LongNameBlowoutTest")
|