1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783 |
- import itertools
- from .. import AssertsCompiledSQL
- from .. import AssertsExecutionResults
- from .. import config
- from .. import fixtures
- from ..assertions import assert_raises
- from ..assertions import eq_
- from ..assertions import in_
- from ..assertsql import CursorSQL
- from ..schema import Column
- from ..schema import Table
- from ... import bindparam
- from ... import case
- from ... import column
- from ... import Computed
- from ... import exists
- from ... import false
- from ... import ForeignKey
- from ... import func
- from ... import Identity
- from ... import Integer
- from ... import literal
- from ... import literal_column
- from ... import null
- from ... import select
- from ... import String
- from ... import table
- from ... import testing
- from ... import text
- from ... import true
- from ... import tuple_
- from ... import TupleType
- from ... import union
- from ... import util
- from ... import values
- from ...exc import DatabaseError
- from ...exc import ProgrammingError
- from ...util import collections_abc
- class CollateTest(fixtures.TablesTest):
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "some_table",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("data", String(100)),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.some_table.insert(),
- [
- {"id": 1, "data": "collate data1"},
- {"id": 2, "data": "collate data2"},
- ],
- )
- def _assert_result(self, select, result):
- with config.db.connect() as conn:
- eq_(conn.execute(select).fetchall(), result)
- @testing.requires.order_by_collation
- def test_collate_order_by(self):
- collation = testing.requires.get_order_by_collation(testing.config)
- self._assert_result(
- select(self.tables.some_table).order_by(
- self.tables.some_table.c.data.collate(collation).asc()
- ),
- [(1, "collate data1"), (2, "collate data2")],
- )
- class OrderByLabelTest(fixtures.TablesTest):
- """Test the dialect sends appropriate ORDER BY expressions when
- labels are used.
- This essentially exercises the "supports_simple_order_by_label"
- setting.
- """
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "some_table",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("x", Integer),
- Column("y", Integer),
- Column("q", String(50)),
- Column("p", String(50)),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.some_table.insert(),
- [
- {"id": 1, "x": 1, "y": 2, "q": "q1", "p": "p3"},
- {"id": 2, "x": 2, "y": 3, "q": "q2", "p": "p2"},
- {"id": 3, "x": 3, "y": 4, "q": "q3", "p": "p1"},
- ],
- )
- def _assert_result(self, select, result):
- with config.db.connect() as conn:
- eq_(conn.execute(select).fetchall(), result)
- def test_plain(self):
- table = self.tables.some_table
- lx = table.c.x.label("lx")
- self._assert_result(select(lx).order_by(lx), [(1,), (2,), (3,)])
- def test_composed_int(self):
- table = self.tables.some_table
- lx = (table.c.x + table.c.y).label("lx")
- self._assert_result(select(lx).order_by(lx), [(3,), (5,), (7,)])
- def test_composed_multiple(self):
- table = self.tables.some_table
- lx = (table.c.x + table.c.y).label("lx")
- ly = (func.lower(table.c.q) + table.c.p).label("ly")
- self._assert_result(
- select(lx, ly).order_by(lx, ly.desc()),
- [(3, util.u("q1p3")), (5, util.u("q2p2")), (7, util.u("q3p1"))],
- )
- def test_plain_desc(self):
- table = self.tables.some_table
- lx = table.c.x.label("lx")
- self._assert_result(select(lx).order_by(lx.desc()), [(3,), (2,), (1,)])
- def test_composed_int_desc(self):
- table = self.tables.some_table
- lx = (table.c.x + table.c.y).label("lx")
- self._assert_result(select(lx).order_by(lx.desc()), [(7,), (5,), (3,)])
- @testing.requires.group_by_complex_expression
- def test_group_by_composed(self):
- table = self.tables.some_table
- expr = (table.c.x + table.c.y).label("lx")
- stmt = (
- select(func.count(table.c.id), expr).group_by(expr).order_by(expr)
- )
- self._assert_result(stmt, [(1, 3), (1, 5), (1, 7)])
- class ValuesExpressionTest(fixtures.TestBase):
- __requires__ = ("table_value_constructor",)
- __backend__ = True
- def test_tuples(self, connection):
- value_expr = values(
- column("id", Integer), column("name", String), name="my_values"
- ).data([(1, "name1"), (2, "name2"), (3, "name3")])
- eq_(
- connection.execute(select(value_expr)).all(),
- [(1, "name1"), (2, "name2"), (3, "name3")],
- )
- class FetchLimitOffsetTest(fixtures.TablesTest):
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "some_table",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("x", Integer),
- Column("y", Integer),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.some_table.insert(),
- [
- {"id": 1, "x": 1, "y": 2},
- {"id": 2, "x": 2, "y": 3},
- {"id": 3, "x": 3, "y": 4},
- {"id": 4, "x": 4, "y": 5},
- {"id": 5, "x": 4, "y": 6},
- ],
- )
- def _assert_result(
- self, connection, select, result, params=(), set_=False
- ):
- if set_:
- query_res = connection.execute(select, params).fetchall()
- eq_(len(query_res), len(result))
- eq_(set(query_res), set(result))
- else:
- eq_(connection.execute(select, params).fetchall(), result)
- def _assert_result_str(self, select, result, params=()):
- conn = config.db.connect(close_with_result=True)
- eq_(conn.exec_driver_sql(select, params).fetchall(), result)
- def test_simple_limit(self, connection):
- table = self.tables.some_table
- stmt = select(table).order_by(table.c.id)
- self._assert_result(
- connection,
- stmt.limit(2),
- [(1, 1, 2), (2, 2, 3)],
- )
- self._assert_result(
- connection,
- stmt.limit(3),
- [(1, 1, 2), (2, 2, 3), (3, 3, 4)],
- )
- def test_limit_render_multiple_times(self, connection):
- table = self.tables.some_table
- stmt = select(table.c.id).limit(1).scalar_subquery()
- u = union(select(stmt), select(stmt)).subquery().select()
- self._assert_result(
- connection,
- u,
- [
- (1,),
- ],
- )
- @testing.requires.fetch_first
- def test_simple_fetch(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).fetch(2),
- [(1, 1, 2), (2, 2, 3)],
- )
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).fetch(3),
- [(1, 1, 2), (2, 2, 3), (3, 3, 4)],
- )
- @testing.requires.offset
- def test_simple_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).offset(2),
- [(3, 3, 4), (4, 4, 5), (5, 4, 6)],
- )
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).offset(3),
- [(4, 4, 5), (5, 4, 6)],
- )
- @testing.combinations(
- ([(2, 0), (2, 1), (3, 2)]),
- ([(2, 1), (2, 0), (3, 2)]),
- ([(3, 1), (2, 1), (3, 1)]),
- argnames="cases",
- )
- @testing.requires.offset
- def test_simple_limit_offset(self, connection, cases):
- table = self.tables.some_table
- connection = connection.execution_options(compiled_cache={})
- assert_data = [(1, 1, 2), (2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)]
- for limit, offset in cases:
- expected = assert_data[offset : offset + limit]
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).limit(limit).offset(offset),
- expected,
- )
- @testing.requires.fetch_first
- def test_simple_fetch_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).fetch(2).offset(1),
- [(2, 2, 3), (3, 3, 4)],
- )
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).fetch(3).offset(2),
- [(3, 3, 4), (4, 4, 5), (5, 4, 6)],
- )
- @testing.requires.fetch_no_order_by
- def test_fetch_offset_no_order(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).fetch(10),
- [(1, 1, 2), (2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)],
- set_=True,
- )
- @testing.requires.offset
- def test_simple_offset_zero(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).offset(0),
- [(1, 1, 2), (2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)],
- )
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).offset(1),
- [(2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)],
- )
- @testing.requires.offset
- def test_limit_offset_nobinds(self):
- """test that 'literal binds' mode works - no bound params."""
- table = self.tables.some_table
- stmt = select(table).order_by(table.c.id).limit(2).offset(1)
- sql = stmt.compile(
- dialect=config.db.dialect, compile_kwargs={"literal_binds": True}
- )
- sql = str(sql)
- self._assert_result_str(sql, [(2, 2, 3), (3, 3, 4)])
- @testing.requires.fetch_first
- def test_fetch_offset_nobinds(self):
- """test that 'literal binds' mode works - no bound params."""
- table = self.tables.some_table
- stmt = select(table).order_by(table.c.id).fetch(2).offset(1)
- sql = stmt.compile(
- dialect=config.db.dialect, compile_kwargs={"literal_binds": True}
- )
- sql = str(sql)
- self._assert_result_str(sql, [(2, 2, 3), (3, 3, 4)])
- @testing.requires.bound_limit_offset
- def test_bound_limit(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).limit(bindparam("l")),
- [(1, 1, 2), (2, 2, 3)],
- params={"l": 2},
- )
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).limit(bindparam("l")),
- [(1, 1, 2), (2, 2, 3), (3, 3, 4)],
- params={"l": 3},
- )
- @testing.requires.bound_limit_offset
- def test_bound_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).offset(bindparam("o")),
- [(3, 3, 4), (4, 4, 5), (5, 4, 6)],
- params={"o": 2},
- )
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).offset(bindparam("o")),
- [(2, 2, 3), (3, 3, 4), (4, 4, 5), (5, 4, 6)],
- params={"o": 1},
- )
- @testing.requires.bound_limit_offset
- def test_bound_limit_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .limit(bindparam("l"))
- .offset(bindparam("o")),
- [(2, 2, 3), (3, 3, 4)],
- params={"l": 2, "o": 1},
- )
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .limit(bindparam("l"))
- .offset(bindparam("o")),
- [(3, 3, 4), (4, 4, 5), (5, 4, 6)],
- params={"l": 3, "o": 2},
- )
- @testing.requires.fetch_first
- def test_bound_fetch_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .fetch(bindparam("f"))
- .offset(bindparam("o")),
- [(2, 2, 3), (3, 3, 4)],
- params={"f": 2, "o": 1},
- )
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .fetch(bindparam("f"))
- .offset(bindparam("o")),
- [(3, 3, 4), (4, 4, 5), (5, 4, 6)],
- params={"f": 3, "o": 2},
- )
- @testing.requires.sql_expression_limit_offset
- def test_expr_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .offset(literal_column("1") + literal_column("2")),
- [(4, 4, 5), (5, 4, 6)],
- )
- @testing.requires.sql_expression_limit_offset
- def test_expr_limit(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .limit(literal_column("1") + literal_column("2")),
- [(1, 1, 2), (2, 2, 3), (3, 3, 4)],
- )
- @testing.requires.sql_expression_limit_offset
- def test_expr_limit_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .limit(literal_column("1") + literal_column("1"))
- .offset(literal_column("1") + literal_column("1")),
- [(3, 3, 4), (4, 4, 5)],
- )
- @testing.requires.fetch_first
- @testing.requires.fetch_expression
- def test_expr_fetch_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .fetch(literal_column("1") + literal_column("1"))
- .offset(literal_column("1") + literal_column("1")),
- [(3, 3, 4), (4, 4, 5)],
- )
- @testing.requires.sql_expression_limit_offset
- def test_simple_limit_expr_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .limit(2)
- .offset(literal_column("1") + literal_column("1")),
- [(3, 3, 4), (4, 4, 5)],
- )
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .limit(3)
- .offset(literal_column("1") + literal_column("1")),
- [(3, 3, 4), (4, 4, 5), (5, 4, 6)],
- )
- @testing.requires.sql_expression_limit_offset
- def test_expr_limit_simple_offset(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .limit(literal_column("1") + literal_column("1"))
- .offset(2),
- [(3, 3, 4), (4, 4, 5)],
- )
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .limit(literal_column("1") + literal_column("1"))
- .offset(1),
- [(2, 2, 3), (3, 3, 4)],
- )
- @testing.requires.fetch_ties
- def test_simple_fetch_ties(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).order_by(table.c.x.desc()).fetch(1, with_ties=True),
- [(4, 4, 5), (5, 4, 6)],
- set_=True,
- )
- self._assert_result(
- connection,
- select(table).order_by(table.c.x.desc()).fetch(3, with_ties=True),
- [(3, 3, 4), (4, 4, 5), (5, 4, 6)],
- set_=True,
- )
- @testing.requires.fetch_ties
- @testing.requires.fetch_offset_with_options
- def test_fetch_offset_ties(self, connection):
- table = self.tables.some_table
- fa = connection.execute(
- select(table)
- .order_by(table.c.x)
- .fetch(2, with_ties=True)
- .offset(2)
- ).fetchall()
- eq_(fa[0], (3, 3, 4))
- eq_(set(fa), set([(3, 3, 4), (4, 4, 5), (5, 4, 6)]))
- @testing.requires.fetch_ties
- @testing.requires.fetch_offset_with_options
- def test_fetch_offset_ties_exact_number(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.x)
- .fetch(2, with_ties=True)
- .offset(1),
- [(2, 2, 3), (3, 3, 4)],
- )
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.x)
- .fetch(3, with_ties=True)
- .offset(3),
- [(4, 4, 5), (5, 4, 6)],
- )
- @testing.requires.fetch_percent
- def test_simple_fetch_percent(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table).order_by(table.c.id).fetch(20, percent=True),
- [(1, 1, 2)],
- )
- @testing.requires.fetch_percent
- @testing.requires.fetch_offset_with_options
- def test_fetch_offset_percent(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.id)
- .fetch(40, percent=True)
- .offset(1),
- [(2, 2, 3), (3, 3, 4)],
- )
- @testing.requires.fetch_ties
- @testing.requires.fetch_percent
- def test_simple_fetch_percent_ties(self, connection):
- table = self.tables.some_table
- self._assert_result(
- connection,
- select(table)
- .order_by(table.c.x.desc())
- .fetch(20, percent=True, with_ties=True),
- [(4, 4, 5), (5, 4, 6)],
- set_=True,
- )
- @testing.requires.fetch_ties
- @testing.requires.fetch_percent
- @testing.requires.fetch_offset_with_options
- def test_fetch_offset_percent_ties(self, connection):
- table = self.tables.some_table
- fa = connection.execute(
- select(table)
- .order_by(table.c.x)
- .fetch(40, percent=True, with_ties=True)
- .offset(2)
- ).fetchall()
- eq_(fa[0], (3, 3, 4))
- eq_(set(fa), set([(3, 3, 4), (4, 4, 5), (5, 4, 6)]))
- class JoinTest(fixtures.TablesTest):
- __backend__ = True
- def _assert_result(self, select, result, params=()):
- with config.db.connect() as conn:
- eq_(conn.execute(select, params).fetchall(), result)
- @classmethod
- def define_tables(cls, metadata):
- Table("a", metadata, Column("id", Integer, primary_key=True))
- Table(
- "b",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("a_id", ForeignKey("a.id"), nullable=False),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.a.insert(),
- [{"id": 1}, {"id": 2}, {"id": 3}, {"id": 4}, {"id": 5}],
- )
- connection.execute(
- cls.tables.b.insert(),
- [
- {"id": 1, "a_id": 1},
- {"id": 2, "a_id": 1},
- {"id": 4, "a_id": 2},
- {"id": 5, "a_id": 3},
- ],
- )
- def test_inner_join_fk(self):
- a, b = self.tables("a", "b")
- stmt = select(a, b).select_from(a.join(b)).order_by(a.c.id, b.c.id)
- self._assert_result(stmt, [(1, 1, 1), (1, 2, 1), (2, 4, 2), (3, 5, 3)])
- def test_inner_join_true(self):
- a, b = self.tables("a", "b")
- stmt = (
- select(a, b)
- .select_from(a.join(b, true()))
- .order_by(a.c.id, b.c.id)
- )
- self._assert_result(
- stmt,
- [
- (a, b, c)
- for (a,), (b, c) in itertools.product(
- [(1,), (2,), (3,), (4,), (5,)],
- [(1, 1), (2, 1), (4, 2), (5, 3)],
- )
- ],
- )
- def test_inner_join_false(self):
- a, b = self.tables("a", "b")
- stmt = (
- select(a, b)
- .select_from(a.join(b, false()))
- .order_by(a.c.id, b.c.id)
- )
- self._assert_result(stmt, [])
- def test_outer_join_false(self):
- a, b = self.tables("a", "b")
- stmt = (
- select(a, b)
- .select_from(a.outerjoin(b, false()))
- .order_by(a.c.id, b.c.id)
- )
- self._assert_result(
- stmt,
- [
- (1, None, None),
- (2, None, None),
- (3, None, None),
- (4, None, None),
- (5, None, None),
- ],
- )
- def test_outer_join_fk(self):
- a, b = self.tables("a", "b")
- stmt = select(a, b).select_from(a.join(b)).order_by(a.c.id, b.c.id)
- self._assert_result(stmt, [(1, 1, 1), (1, 2, 1), (2, 4, 2), (3, 5, 3)])
- class CompoundSelectTest(fixtures.TablesTest):
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "some_table",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("x", Integer),
- Column("y", Integer),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.some_table.insert(),
- [
- {"id": 1, "x": 1, "y": 2},
- {"id": 2, "x": 2, "y": 3},
- {"id": 3, "x": 3, "y": 4},
- {"id": 4, "x": 4, "y": 5},
- ],
- )
- def _assert_result(self, select, result, params=()):
- with config.db.connect() as conn:
- eq_(conn.execute(select, params).fetchall(), result)
- def test_plain_union(self):
- table = self.tables.some_table
- s1 = select(table).where(table.c.id == 2)
- s2 = select(table).where(table.c.id == 3)
- u1 = union(s1, s2)
- self._assert_result(
- u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
- )
- def test_select_from_plain_union(self):
- table = self.tables.some_table
- s1 = select(table).where(table.c.id == 2)
- s2 = select(table).where(table.c.id == 3)
- u1 = union(s1, s2).alias().select()
- self._assert_result(
- u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
- )
- @testing.requires.order_by_col_from_union
- @testing.requires.parens_in_union_contained_select_w_limit_offset
- def test_limit_offset_selectable_in_unions(self):
- table = self.tables.some_table
- s1 = select(table).where(table.c.id == 2).limit(1).order_by(table.c.id)
- s2 = select(table).where(table.c.id == 3).limit(1).order_by(table.c.id)
- u1 = union(s1, s2).limit(2)
- self._assert_result(
- u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
- )
- @testing.requires.parens_in_union_contained_select_wo_limit_offset
- def test_order_by_selectable_in_unions(self):
- table = self.tables.some_table
- s1 = select(table).where(table.c.id == 2).order_by(table.c.id)
- s2 = select(table).where(table.c.id == 3).order_by(table.c.id)
- u1 = union(s1, s2).limit(2)
- self._assert_result(
- u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
- )
- def test_distinct_selectable_in_unions(self):
- table = self.tables.some_table
- s1 = select(table).where(table.c.id == 2).distinct()
- s2 = select(table).where(table.c.id == 3).distinct()
- u1 = union(s1, s2).limit(2)
- self._assert_result(
- u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
- )
- @testing.requires.parens_in_union_contained_select_w_limit_offset
- def test_limit_offset_in_unions_from_alias(self):
- table = self.tables.some_table
- s1 = select(table).where(table.c.id == 2).limit(1).order_by(table.c.id)
- s2 = select(table).where(table.c.id == 3).limit(1).order_by(table.c.id)
- # this necessarily has double parens
- u1 = union(s1, s2).alias()
- self._assert_result(
- u1.select().limit(2).order_by(u1.c.id), [(2, 2, 3), (3, 3, 4)]
- )
- def test_limit_offset_aliased_selectable_in_unions(self):
- table = self.tables.some_table
- s1 = (
- select(table)
- .where(table.c.id == 2)
- .limit(1)
- .order_by(table.c.id)
- .alias()
- .select()
- )
- s2 = (
- select(table)
- .where(table.c.id == 3)
- .limit(1)
- .order_by(table.c.id)
- .alias()
- .select()
- )
- u1 = union(s1, s2).limit(2)
- self._assert_result(
- u1.order_by(u1.selected_columns.id), [(2, 2, 3), (3, 3, 4)]
- )
- class PostCompileParamsTest(
- AssertsExecutionResults, AssertsCompiledSQL, fixtures.TablesTest
- ):
- __backend__ = True
- __requires__ = ("standard_cursor_sql",)
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "some_table",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("x", Integer),
- Column("y", Integer),
- Column("z", String(50)),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.some_table.insert(),
- [
- {"id": 1, "x": 1, "y": 2, "z": "z1"},
- {"id": 2, "x": 2, "y": 3, "z": "z2"},
- {"id": 3, "x": 3, "y": 4, "z": "z3"},
- {"id": 4, "x": 4, "y": 5, "z": "z4"},
- ],
- )
- def test_compile(self):
- table = self.tables.some_table
- stmt = select(table.c.id).where(
- table.c.x == bindparam("q", literal_execute=True)
- )
- self.assert_compile(
- stmt,
- "SELECT some_table.id FROM some_table "
- "WHERE some_table.x = __[POSTCOMPILE_q]",
- {},
- )
- def test_compile_literal_binds(self):
- table = self.tables.some_table
- stmt = select(table.c.id).where(
- table.c.x == bindparam("q", 10, literal_execute=True)
- )
- self.assert_compile(
- stmt,
- "SELECT some_table.id FROM some_table WHERE some_table.x = 10",
- {},
- literal_binds=True,
- )
- def test_execute(self):
- table = self.tables.some_table
- stmt = select(table.c.id).where(
- table.c.x == bindparam("q", literal_execute=True)
- )
- with self.sql_execution_asserter() as asserter:
- with config.db.connect() as conn:
- conn.execute(stmt, dict(q=10))
- asserter.assert_(
- CursorSQL(
- "SELECT some_table.id \nFROM some_table "
- "\nWHERE some_table.x = 10",
- () if config.db.dialect.positional else {},
- )
- )
- def test_execute_expanding_plus_literal_execute(self):
- table = self.tables.some_table
- stmt = select(table.c.id).where(
- table.c.x.in_(bindparam("q", expanding=True, literal_execute=True))
- )
- with self.sql_execution_asserter() as asserter:
- with config.db.connect() as conn:
- conn.execute(stmt, dict(q=[5, 6, 7]))
- asserter.assert_(
- CursorSQL(
- "SELECT some_table.id \nFROM some_table "
- "\nWHERE some_table.x IN (5, 6, 7)",
- () if config.db.dialect.positional else {},
- )
- )
- @testing.requires.tuple_in
- def test_execute_tuple_expanding_plus_literal_execute(self):
- table = self.tables.some_table
- stmt = select(table.c.id).where(
- tuple_(table.c.x, table.c.y).in_(
- bindparam("q", expanding=True, literal_execute=True)
- )
- )
- with self.sql_execution_asserter() as asserter:
- with config.db.connect() as conn:
- conn.execute(stmt, dict(q=[(5, 10), (12, 18)]))
- asserter.assert_(
- CursorSQL(
- "SELECT some_table.id \nFROM some_table "
- "\nWHERE (some_table.x, some_table.y) "
- "IN (%s(5, 10), (12, 18))"
- % ("VALUES " if config.db.dialect.tuple_in_values else ""),
- () if config.db.dialect.positional else {},
- )
- )
- @testing.requires.tuple_in
- def test_execute_tuple_expanding_plus_literal_heterogeneous_execute(self):
- table = self.tables.some_table
- stmt = select(table.c.id).where(
- tuple_(table.c.x, table.c.z).in_(
- bindparam("q", expanding=True, literal_execute=True)
- )
- )
- with self.sql_execution_asserter() as asserter:
- with config.db.connect() as conn:
- conn.execute(stmt, dict(q=[(5, "z1"), (12, "z3")]))
- asserter.assert_(
- CursorSQL(
- "SELECT some_table.id \nFROM some_table "
- "\nWHERE (some_table.x, some_table.z) "
- "IN (%s(5, 'z1'), (12, 'z3'))"
- % ("VALUES " if config.db.dialect.tuple_in_values else ""),
- () if config.db.dialect.positional else {},
- )
- )
- class ExpandingBoundInTest(fixtures.TablesTest):
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "some_table",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("x", Integer),
- Column("y", Integer),
- Column("z", String(50)),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.some_table.insert(),
- [
- {"id": 1, "x": 1, "y": 2, "z": "z1"},
- {"id": 2, "x": 2, "y": 3, "z": "z2"},
- {"id": 3, "x": 3, "y": 4, "z": "z3"},
- {"id": 4, "x": 4, "y": 5, "z": "z4"},
- ],
- )
- def _assert_result(self, select, result, params=()):
- with config.db.connect() as conn:
- eq_(conn.execute(select, params).fetchall(), result)
- def test_multiple_empty_sets_bindparam(self):
- # test that any anonymous aliasing used by the dialect
- # is fine with duplicates
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.x.in_(bindparam("q")))
- .where(table.c.y.in_(bindparam("p")))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [], params={"q": [], "p": []})
- def test_multiple_empty_sets_direct(self):
- # test that any anonymous aliasing used by the dialect
- # is fine with duplicates
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.x.in_([]))
- .where(table.c.y.in_([]))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [])
- @testing.requires.tuple_in_w_empty
- def test_empty_heterogeneous_tuples_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(tuple_(table.c.x, table.c.z).in_(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [], params={"q": []})
- @testing.requires.tuple_in_w_empty
- def test_empty_heterogeneous_tuples_direct(self):
- table = self.tables.some_table
- def go(val, expected):
- stmt = (
- select(table.c.id)
- .where(tuple_(table.c.x, table.c.z).in_(val))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, expected)
- go([], [])
- go([(2, "z2"), (3, "z3"), (4, "z4")], [(2,), (3,), (4,)])
- go([], [])
- @testing.requires.tuple_in_w_empty
- def test_empty_homogeneous_tuples_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(tuple_(table.c.x, table.c.y).in_(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [], params={"q": []})
- @testing.requires.tuple_in_w_empty
- def test_empty_homogeneous_tuples_direct(self):
- table = self.tables.some_table
- def go(val, expected):
- stmt = (
- select(table.c.id)
- .where(tuple_(table.c.x, table.c.y).in_(val))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, expected)
- go([], [])
- go([(1, 2), (2, 3), (3, 4)], [(1,), (2,), (3,)])
- go([], [])
- def test_bound_in_scalar_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.x.in_(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [(2,), (3,), (4,)], params={"q": [2, 3, 4]})
- def test_bound_in_scalar_direct(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.x.in_([2, 3, 4]))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [(2,), (3,), (4,)])
- def test_nonempty_in_plus_empty_notin(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.x.in_([2, 3]))
- .where(table.c.id.not_in([]))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [(2,), (3,)])
- def test_empty_in_plus_notempty_notin(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.x.in_([]))
- .where(table.c.id.not_in([2, 3]))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [])
- def test_typed_str_in(self):
- """test related to #7292.
- as a type is given to the bound param, there is no ambiguity
- to the type of element.
- """
- stmt = text(
- "select id FROM some_table WHERE z IN :q ORDER BY id"
- ).bindparams(bindparam("q", type_=String, expanding=True))
- self._assert_result(
- stmt,
- [(2,), (3,), (4,)],
- params={"q": ["z2", "z3", "z4"]},
- )
- def test_untyped_str_in(self):
- """test related to #7292.
- for untyped expression, we look at the types of elements.
- Test for Sequence to detect tuple in. but not strings or bytes!
- as always....
- """
- stmt = text(
- "select id FROM some_table WHERE z IN :q ORDER BY id"
- ).bindparams(bindparam("q", expanding=True))
- self._assert_result(
- stmt,
- [(2,), (3,), (4,)],
- params={"q": ["z2", "z3", "z4"]},
- )
- @testing.requires.tuple_in
- def test_bound_in_two_tuple_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(tuple_(table.c.x, table.c.y).in_(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(
- stmt, [(2,), (3,), (4,)], params={"q": [(2, 3), (3, 4), (4, 5)]}
- )
- @testing.requires.tuple_in
- def test_bound_in_two_tuple_direct(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(tuple_(table.c.x, table.c.y).in_([(2, 3), (3, 4), (4, 5)]))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [(2,), (3,), (4,)])
- @testing.requires.tuple_in
- def test_bound_in_heterogeneous_two_tuple_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(tuple_(table.c.x, table.c.z).in_(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(
- stmt,
- [(2,), (3,), (4,)],
- params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
- )
- @testing.requires.tuple_in
- def test_bound_in_heterogeneous_two_tuple_direct(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(
- tuple_(table.c.x, table.c.z).in_(
- [(2, "z2"), (3, "z3"), (4, "z4")]
- )
- )
- .order_by(table.c.id)
- )
- self._assert_result(
- stmt,
- [(2,), (3,), (4,)],
- )
- @testing.requires.tuple_in
- def test_bound_in_heterogeneous_two_tuple_text_bindparam(self):
- # note this becomes ARRAY if we dont use expanding
- # explicitly right now
- stmt = text(
- "select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
- ).bindparams(bindparam("q", expanding=True))
- self._assert_result(
- stmt,
- [(2,), (3,), (4,)],
- params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
- )
- @testing.requires.tuple_in
- def test_bound_in_heterogeneous_two_tuple_typed_bindparam_non_tuple(self):
- class LikeATuple(collections_abc.Sequence):
- def __init__(self, *data):
- self._data = data
- def __iter__(self):
- return iter(self._data)
- def __getitem__(self, idx):
- return self._data[idx]
- def __len__(self):
- return len(self._data)
- stmt = text(
- "select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
- ).bindparams(
- bindparam(
- "q", type_=TupleType(Integer(), String()), expanding=True
- )
- )
- self._assert_result(
- stmt,
- [(2,), (3,), (4,)],
- params={
- "q": [
- LikeATuple(2, "z2"),
- LikeATuple(3, "z3"),
- LikeATuple(4, "z4"),
- ]
- },
- )
- @testing.requires.tuple_in
- def test_bound_in_heterogeneous_two_tuple_text_bindparam_non_tuple(self):
- # note this becomes ARRAY if we dont use expanding
- # explicitly right now
- class LikeATuple(collections_abc.Sequence):
- def __init__(self, *data):
- self._data = data
- def __iter__(self):
- return iter(self._data)
- def __getitem__(self, idx):
- return self._data[idx]
- def __len__(self):
- return len(self._data)
- stmt = text(
- "select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
- ).bindparams(bindparam("q", expanding=True))
- self._assert_result(
- stmt,
- [(2,), (3,), (4,)],
- params={
- "q": [
- LikeATuple(2, "z2"),
- LikeATuple(3, "z3"),
- LikeATuple(4, "z4"),
- ]
- },
- )
- def test_empty_set_against_integer_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.x.in_(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [], params={"q": []})
- def test_empty_set_against_integer_direct(self):
- table = self.tables.some_table
- stmt = select(table.c.id).where(table.c.x.in_([])).order_by(table.c.id)
- self._assert_result(stmt, [])
- def test_empty_set_against_integer_negation_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.x.not_in(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
- def test_empty_set_against_integer_negation_direct(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id).where(table.c.x.not_in([])).order_by(table.c.id)
- )
- self._assert_result(stmt, [(1,), (2,), (3,), (4,)])
- def test_empty_set_against_string_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.z.in_(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [], params={"q": []})
- def test_empty_set_against_string_direct(self):
- table = self.tables.some_table
- stmt = select(table.c.id).where(table.c.z.in_([])).order_by(table.c.id)
- self._assert_result(stmt, [])
- def test_empty_set_against_string_negation_bindparam(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id)
- .where(table.c.z.not_in(bindparam("q")))
- .order_by(table.c.id)
- )
- self._assert_result(stmt, [(1,), (2,), (3,), (4,)], params={"q": []})
- def test_empty_set_against_string_negation_direct(self):
- table = self.tables.some_table
- stmt = (
- select(table.c.id).where(table.c.z.not_in([])).order_by(table.c.id)
- )
- self._assert_result(stmt, [(1,), (2,), (3,), (4,)])
- def test_null_in_empty_set_is_false_bindparam(self, connection):
- stmt = select(
- case(
- (
- null().in_(bindparam("foo", value=())),
- true(),
- ),
- else_=false(),
- )
- )
- in_(connection.execute(stmt).fetchone()[0], (False, 0))
- def test_null_in_empty_set_is_false_direct(self, connection):
- stmt = select(
- case(
- (
- null().in_([]),
- true(),
- ),
- else_=false(),
- )
- )
- in_(connection.execute(stmt).fetchone()[0], (False, 0))
- class LikeFunctionsTest(fixtures.TablesTest):
- __backend__ = True
- run_inserts = "once"
- run_deletes = None
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "some_table",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("data", String(50)),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.some_table.insert(),
- [
- {"id": 1, "data": "abcdefg"},
- {"id": 2, "data": "ab/cdefg"},
- {"id": 3, "data": "ab%cdefg"},
- {"id": 4, "data": "ab_cdefg"},
- {"id": 5, "data": "abcde/fg"},
- {"id": 6, "data": "abcde%fg"},
- {"id": 7, "data": "ab#cdefg"},
- {"id": 8, "data": "ab9cdefg"},
- {"id": 9, "data": "abcde#fg"},
- {"id": 10, "data": "abcd9fg"},
- {"id": 11, "data": None},
- ],
- )
- def _test(self, expr, expected):
- some_table = self.tables.some_table
- with config.db.connect() as conn:
- rows = {
- value
- for value, in conn.execute(select(some_table.c.id).where(expr))
- }
- eq_(rows, expected)
- def test_startswith_unescaped(self):
- col = self.tables.some_table.c.data
- self._test(col.startswith("ab%c"), {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
- def test_startswith_autoescape(self):
- col = self.tables.some_table.c.data
- self._test(col.startswith("ab%c", autoescape=True), {3})
- def test_startswith_sqlexpr(self):
- col = self.tables.some_table.c.data
- self._test(
- col.startswith(literal_column("'ab%c'")),
- {1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
- )
- def test_startswith_escape(self):
- col = self.tables.some_table.c.data
- self._test(col.startswith("ab##c", escape="#"), {7})
- def test_startswith_autoescape_escape(self):
- col = self.tables.some_table.c.data
- self._test(col.startswith("ab%c", autoescape=True, escape="#"), {3})
- self._test(col.startswith("ab#c", autoescape=True, escape="#"), {7})
- def test_endswith_unescaped(self):
- col = self.tables.some_table.c.data
- self._test(col.endswith("e%fg"), {1, 2, 3, 4, 5, 6, 7, 8, 9})
- def test_endswith_sqlexpr(self):
- col = self.tables.some_table.c.data
- self._test(
- col.endswith(literal_column("'e%fg'")), {1, 2, 3, 4, 5, 6, 7, 8, 9}
- )
- def test_endswith_autoescape(self):
- col = self.tables.some_table.c.data
- self._test(col.endswith("e%fg", autoescape=True), {6})
- def test_endswith_escape(self):
- col = self.tables.some_table.c.data
- self._test(col.endswith("e##fg", escape="#"), {9})
- def test_endswith_autoescape_escape(self):
- col = self.tables.some_table.c.data
- self._test(col.endswith("e%fg", autoescape=True, escape="#"), {6})
- self._test(col.endswith("e#fg", autoescape=True, escape="#"), {9})
- def test_contains_unescaped(self):
- col = self.tables.some_table.c.data
- self._test(col.contains("b%cde"), {1, 2, 3, 4, 5, 6, 7, 8, 9})
- def test_contains_autoescape(self):
- col = self.tables.some_table.c.data
- self._test(col.contains("b%cde", autoescape=True), {3})
- def test_contains_escape(self):
- col = self.tables.some_table.c.data
- self._test(col.contains("b##cde", escape="#"), {7})
- def test_contains_autoescape_escape(self):
- col = self.tables.some_table.c.data
- self._test(col.contains("b%cd", autoescape=True, escape="#"), {3})
- self._test(col.contains("b#cd", autoescape=True, escape="#"), {7})
- @testing.requires.regexp_match
- def test_not_regexp_match(self):
- col = self.tables.some_table.c.data
- self._test(~col.regexp_match("a.cde"), {2, 3, 4, 7, 8, 10})
- @testing.requires.regexp_replace
- def test_regexp_replace(self):
- col = self.tables.some_table.c.data
- self._test(
- col.regexp_replace("a.cde", "FOO").contains("FOO"), {1, 5, 6, 9}
- )
- @testing.requires.regexp_match
- @testing.combinations(
- ("a.cde", {1, 5, 6, 9}),
- ("abc", {1, 5, 6, 9, 10}),
- ("^abc", {1, 5, 6, 9, 10}),
- ("9cde", {8}),
- ("^a", set(range(1, 11))),
- ("(b|c)", set(range(1, 11))),
- ("^(b|c)", set()),
- )
- def test_regexp_match(self, text, expected):
- col = self.tables.some_table.c.data
- self._test(col.regexp_match(text), expected)
- class ComputedColumnTest(fixtures.TablesTest):
- __backend__ = True
- __requires__ = ("computed_columns",)
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "square",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("side", Integer),
- Column("area", Integer, Computed("side * side")),
- Column("perimeter", Integer, Computed("4 * side")),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.square.insert(),
- [{"id": 1, "side": 10}, {"id": 10, "side": 42}],
- )
- def test_select_all(self):
- with config.db.connect() as conn:
- res = conn.execute(
- select(text("*"))
- .select_from(self.tables.square)
- .order_by(self.tables.square.c.id)
- ).fetchall()
- eq_(res, [(1, 10, 100, 40), (10, 42, 1764, 168)])
- def test_select_columns(self):
- with config.db.connect() as conn:
- res = conn.execute(
- select(
- self.tables.square.c.area, self.tables.square.c.perimeter
- )
- .select_from(self.tables.square)
- .order_by(self.tables.square.c.id)
- ).fetchall()
- eq_(res, [(100, 40), (1764, 168)])
- class IdentityColumnTest(fixtures.TablesTest):
- __backend__ = True
- __requires__ = ("identity_columns",)
- run_inserts = "once"
- run_deletes = "once"
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "tbl_a",
- metadata,
- Column(
- "id",
- Integer,
- Identity(
- always=True, start=42, nominvalue=True, nomaxvalue=True
- ),
- primary_key=True,
- ),
- Column("desc", String(100)),
- )
- Table(
- "tbl_b",
- metadata,
- Column(
- "id",
- Integer,
- Identity(increment=-5, start=0, minvalue=-1000, maxvalue=0),
- primary_key=True,
- ),
- Column("desc", String(100)),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.tbl_a.insert(),
- [{"desc": "a"}, {"desc": "b"}],
- )
- connection.execute(
- cls.tables.tbl_b.insert(),
- [{"desc": "a"}, {"desc": "b"}],
- )
- connection.execute(
- cls.tables.tbl_b.insert(),
- [{"id": 42, "desc": "c"}],
- )
- def test_select_all(self, connection):
- res = connection.execute(
- select(text("*"))
- .select_from(self.tables.tbl_a)
- .order_by(self.tables.tbl_a.c.id)
- ).fetchall()
- eq_(res, [(42, "a"), (43, "b")])
- res = connection.execute(
- select(text("*"))
- .select_from(self.tables.tbl_b)
- .order_by(self.tables.tbl_b.c.id)
- ).fetchall()
- eq_(res, [(-5, "b"), (0, "a"), (42, "c")])
- def test_select_columns(self, connection):
- res = connection.execute(
- select(self.tables.tbl_a.c.id).order_by(self.tables.tbl_a.c.id)
- ).fetchall()
- eq_(res, [(42,), (43,)])
- @testing.requires.identity_columns_standard
- def test_insert_always_error(self, connection):
- def fn():
- connection.execute(
- self.tables.tbl_a.insert(),
- [{"id": 200, "desc": "a"}],
- )
- assert_raises((DatabaseError, ProgrammingError), fn)
- class IdentityAutoincrementTest(fixtures.TablesTest):
- __backend__ = True
- __requires__ = ("autoincrement_without_sequence",)
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "tbl",
- metadata,
- Column(
- "id",
- Integer,
- Identity(),
- primary_key=True,
- autoincrement=True,
- ),
- Column("desc", String(100)),
- )
- def test_autoincrement_with_identity(self, connection):
- res = connection.execute(self.tables.tbl.insert(), {"desc": "row"})
- res = connection.execute(self.tables.tbl.select()).first()
- eq_(res, (1, "row"))
- class ExistsTest(fixtures.TablesTest):
- __backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "stuff",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("data", String(50)),
- )
- @classmethod
- def insert_data(cls, connection):
- connection.execute(
- cls.tables.stuff.insert(),
- [
- {"id": 1, "data": "some data"},
- {"id": 2, "data": "some data"},
- {"id": 3, "data": "some data"},
- {"id": 4, "data": "some other data"},
- ],
- )
- def test_select_exists(self, connection):
- stuff = self.tables.stuff
- eq_(
- connection.execute(
- select(literal(1)).where(
- exists().where(stuff.c.data == "some data")
- )
- ).fetchall(),
- [(1,)],
- )
- def test_select_exists_false(self, connection):
- stuff = self.tables.stuff
- eq_(
- connection.execute(
- select(literal(1)).where(
- exists().where(stuff.c.data == "no data")
- )
- ).fetchall(),
- [],
- )
- class DistinctOnTest(AssertsCompiledSQL, fixtures.TablesTest):
- __backend__ = True
- @testing.fails_if(testing.requires.supports_distinct_on)
- def test_distinct_on(self):
- stm = select("*").distinct(column("q")).select_from(table("foo"))
- with testing.expect_deprecated(
- "DISTINCT ON is currently supported only by the PostgreSQL "
- ):
- self.assert_compile(stm, "SELECT DISTINCT * FROM foo")
- class IsOrIsNotDistinctFromTest(fixtures.TablesTest):
- __backend__ = True
- __requires__ = ("supports_is_distinct_from",)
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "is_distinct_test",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("col_a", Integer, nullable=True),
- Column("col_b", Integer, nullable=True),
- )
- @testing.combinations(
- ("both_int_different", 0, 1, 1),
- ("both_int_same", 1, 1, 0),
- ("one_null_first", None, 1, 1),
- ("one_null_second", 0, None, 1),
- ("both_null", None, None, 0),
- id_="iaaa",
- argnames="col_a_value, col_b_value, expected_row_count_for_is",
- )
- def test_is_or_is_not_distinct_from(
- self, col_a_value, col_b_value, expected_row_count_for_is, connection
- ):
- tbl = self.tables.is_distinct_test
- connection.execute(
- tbl.insert(),
- [{"id": 1, "col_a": col_a_value, "col_b": col_b_value}],
- )
- result = connection.execute(
- tbl.select().where(tbl.c.col_a.is_distinct_from(tbl.c.col_b))
- ).fetchall()
- eq_(
- len(result),
- expected_row_count_for_is,
- )
- expected_row_count_for_is_not = (
- 1 if expected_row_count_for_is == 0 else 0
- )
- result = connection.execute(
- tbl.select().where(tbl.c.col_a.is_not_distinct_from(tbl.c.col_b))
- ).fetchall()
- eq_(
- len(result),
- expected_row_count_for_is_not,
- )
|