test_reflection.py 58 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736
  1. import operator
  2. import re
  3. import sqlalchemy as sa
  4. from .. import config
  5. from .. import engines
  6. from .. import eq_
  7. from .. import expect_warnings
  8. from .. import fixtures
  9. from .. import is_
  10. from ..provision import get_temp_table_name
  11. from ..provision import temp_table_keyword_args
  12. from ..schema import Column
  13. from ..schema import Table
  14. from ... import event
  15. from ... import ForeignKey
  16. from ... import func
  17. from ... import Identity
  18. from ... import inspect
  19. from ... import Integer
  20. from ... import MetaData
  21. from ... import String
  22. from ... import testing
  23. from ... import types as sql_types
  24. from ...schema import DDL
  25. from ...schema import Index
  26. from ...sql.elements import quoted_name
  27. from ...sql.schema import BLANK_SCHEMA
  28. from ...testing import is_false
  29. from ...testing import is_true
  30. metadata, users = None, None
  31. class HasTableTest(fixtures.TablesTest):
  32. __backend__ = True
  33. @classmethod
  34. def define_tables(cls, metadata):
  35. Table(
  36. "test_table",
  37. metadata,
  38. Column("id", Integer, primary_key=True),
  39. Column("data", String(50)),
  40. )
  41. if testing.requires.schemas.enabled:
  42. Table(
  43. "test_table_s",
  44. metadata,
  45. Column("id", Integer, primary_key=True),
  46. Column("data", String(50)),
  47. schema=config.test_schema,
  48. )
  49. def test_has_table(self):
  50. with config.db.begin() as conn:
  51. is_true(config.db.dialect.has_table(conn, "test_table"))
  52. is_false(config.db.dialect.has_table(conn, "test_table_s"))
  53. is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
  54. @testing.requires.schemas
  55. def test_has_table_schema(self):
  56. with config.db.begin() as conn:
  57. is_false(
  58. config.db.dialect.has_table(
  59. conn, "test_table", schema=config.test_schema
  60. )
  61. )
  62. is_true(
  63. config.db.dialect.has_table(
  64. conn, "test_table_s", schema=config.test_schema
  65. )
  66. )
  67. is_false(
  68. config.db.dialect.has_table(
  69. conn, "nonexistent_table", schema=config.test_schema
  70. )
  71. )
  72. class HasIndexTest(fixtures.TablesTest):
  73. __backend__ = True
  74. @classmethod
  75. def define_tables(cls, metadata):
  76. tt = Table(
  77. "test_table",
  78. metadata,
  79. Column("id", Integer, primary_key=True),
  80. Column("data", String(50)),
  81. )
  82. Index("my_idx", tt.c.data)
  83. if testing.requires.schemas.enabled:
  84. tt = Table(
  85. "test_table",
  86. metadata,
  87. Column("id", Integer, primary_key=True),
  88. Column("data", String(50)),
  89. schema=config.test_schema,
  90. )
  91. Index("my_idx_s", tt.c.data)
  92. def test_has_index(self):
  93. with config.db.begin() as conn:
  94. assert config.db.dialect.has_index(conn, "test_table", "my_idx")
  95. assert not config.db.dialect.has_index(
  96. conn, "test_table", "my_idx_s"
  97. )
  98. assert not config.db.dialect.has_index(
  99. conn, "nonexistent_table", "my_idx"
  100. )
  101. assert not config.db.dialect.has_index(
  102. conn, "test_table", "nonexistent_idx"
  103. )
  104. @testing.requires.schemas
  105. def test_has_index_schema(self):
  106. with config.db.begin() as conn:
  107. assert config.db.dialect.has_index(
  108. conn, "test_table", "my_idx_s", schema=config.test_schema
  109. )
  110. assert not config.db.dialect.has_index(
  111. conn, "test_table", "my_idx", schema=config.test_schema
  112. )
  113. assert not config.db.dialect.has_index(
  114. conn,
  115. "nonexistent_table",
  116. "my_idx_s",
  117. schema=config.test_schema,
  118. )
  119. assert not config.db.dialect.has_index(
  120. conn,
  121. "test_table",
  122. "nonexistent_idx_s",
  123. schema=config.test_schema,
  124. )
  125. class QuotedNameArgumentTest(fixtures.TablesTest):
  126. run_create_tables = "once"
  127. __backend__ = True
  128. @classmethod
  129. def define_tables(cls, metadata):
  130. Table(
  131. "quote ' one",
  132. metadata,
  133. Column("id", Integer),
  134. Column("name", String(50)),
  135. Column("data", String(50)),
  136. Column("related_id", Integer),
  137. sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
  138. sa.Index("ix quote ' one", "name"),
  139. sa.UniqueConstraint(
  140. "data",
  141. name="uq quote' one",
  142. ),
  143. sa.ForeignKeyConstraint(
  144. ["id"], ["related.id"], name="fk quote ' one"
  145. ),
  146. sa.CheckConstraint("name != 'foo'", name="ck quote ' one"),
  147. comment=r"""quote ' one comment""",
  148. test_needs_fk=True,
  149. )
  150. if testing.requires.symbol_names_w_double_quote.enabled:
  151. Table(
  152. 'quote " two',
  153. metadata,
  154. Column("id", Integer),
  155. Column("name", String(50)),
  156. Column("data", String(50)),
  157. Column("related_id", Integer),
  158. sa.PrimaryKeyConstraint("id", name='pk quote " two'),
  159. sa.Index('ix quote " two', "name"),
  160. sa.UniqueConstraint(
  161. "data",
  162. name='uq quote" two',
  163. ),
  164. sa.ForeignKeyConstraint(
  165. ["id"], ["related.id"], name='fk quote " two'
  166. ),
  167. sa.CheckConstraint("name != 'foo'", name='ck quote " two '),
  168. comment=r"""quote " two comment""",
  169. test_needs_fk=True,
  170. )
  171. Table(
  172. "related",
  173. metadata,
  174. Column("id", Integer, primary_key=True),
  175. Column("related", Integer),
  176. test_needs_fk=True,
  177. )
  178. if testing.requires.view_column_reflection.enabled:
  179. if testing.requires.symbol_names_w_double_quote.enabled:
  180. names = [
  181. "quote ' one",
  182. 'quote " two',
  183. ]
  184. else:
  185. names = [
  186. "quote ' one",
  187. ]
  188. for name in names:
  189. query = "CREATE VIEW %s AS SELECT * FROM %s" % (
  190. config.db.dialect.identifier_preparer.quote(
  191. "view %s" % name
  192. ),
  193. config.db.dialect.identifier_preparer.quote(name),
  194. )
  195. event.listen(metadata, "after_create", DDL(query))
  196. event.listen(
  197. metadata,
  198. "before_drop",
  199. DDL(
  200. "DROP VIEW %s"
  201. % config.db.dialect.identifier_preparer.quote(
  202. "view %s" % name
  203. )
  204. ),
  205. )
  206. def quote_fixtures(fn):
  207. return testing.combinations(
  208. ("quote ' one",),
  209. ('quote " two', testing.requires.symbol_names_w_double_quote),
  210. )(fn)
  211. @quote_fixtures
  212. def test_get_table_options(self, name):
  213. insp = inspect(config.db)
  214. insp.get_table_options(name)
  215. @quote_fixtures
  216. @testing.requires.view_column_reflection
  217. def test_get_view_definition(self, name):
  218. insp = inspect(config.db)
  219. assert insp.get_view_definition("view %s" % name)
  220. @quote_fixtures
  221. def test_get_columns(self, name):
  222. insp = inspect(config.db)
  223. assert insp.get_columns(name)
  224. @quote_fixtures
  225. def test_get_pk_constraint(self, name):
  226. insp = inspect(config.db)
  227. assert insp.get_pk_constraint(name)
  228. @quote_fixtures
  229. def test_get_foreign_keys(self, name):
  230. insp = inspect(config.db)
  231. assert insp.get_foreign_keys(name)
  232. @quote_fixtures
  233. def test_get_indexes(self, name):
  234. insp = inspect(config.db)
  235. assert insp.get_indexes(name)
  236. @quote_fixtures
  237. @testing.requires.unique_constraint_reflection
  238. def test_get_unique_constraints(self, name):
  239. insp = inspect(config.db)
  240. assert insp.get_unique_constraints(name)
  241. @quote_fixtures
  242. @testing.requires.comment_reflection
  243. def test_get_table_comment(self, name):
  244. insp = inspect(config.db)
  245. assert insp.get_table_comment(name)
  246. @quote_fixtures
  247. @testing.requires.check_constraint_reflection
  248. def test_get_check_constraints(self, name):
  249. insp = inspect(config.db)
  250. assert insp.get_check_constraints(name)
  251. class ComponentReflectionTest(fixtures.TablesTest):
  252. run_inserts = run_deletes = None
  253. __backend__ = True
  254. @classmethod
  255. def setup_bind(cls):
  256. if config.requirements.independent_connections.enabled:
  257. from sqlalchemy import pool
  258. return engines.testing_engine(
  259. options=dict(poolclass=pool.StaticPool, scope="class"),
  260. )
  261. else:
  262. return config.db
  263. @classmethod
  264. def define_tables(cls, metadata):
  265. cls.define_reflected_tables(metadata, None)
  266. if testing.requires.schemas.enabled:
  267. cls.define_reflected_tables(metadata, testing.config.test_schema)
  268. @classmethod
  269. def define_reflected_tables(cls, metadata, schema):
  270. if schema:
  271. schema_prefix = schema + "."
  272. else:
  273. schema_prefix = ""
  274. if testing.requires.self_referential_foreign_keys.enabled:
  275. users = Table(
  276. "users",
  277. metadata,
  278. Column("user_id", sa.INT, primary_key=True),
  279. Column("test1", sa.CHAR(5), nullable=False),
  280. Column("test2", sa.Float(5), nullable=False),
  281. Column(
  282. "parent_user_id",
  283. sa.Integer,
  284. sa.ForeignKey(
  285. "%susers.user_id" % schema_prefix, name="user_id_fk"
  286. ),
  287. ),
  288. schema=schema,
  289. test_needs_fk=True,
  290. )
  291. else:
  292. users = Table(
  293. "users",
  294. metadata,
  295. Column("user_id", sa.INT, primary_key=True),
  296. Column("test1", sa.CHAR(5), nullable=False),
  297. Column("test2", sa.Float(5), nullable=False),
  298. schema=schema,
  299. test_needs_fk=True,
  300. )
  301. Table(
  302. "dingalings",
  303. metadata,
  304. Column("dingaling_id", sa.Integer, primary_key=True),
  305. Column(
  306. "address_id",
  307. sa.Integer,
  308. sa.ForeignKey("%semail_addresses.address_id" % schema_prefix),
  309. ),
  310. Column("data", sa.String(30)),
  311. schema=schema,
  312. test_needs_fk=True,
  313. )
  314. Table(
  315. "email_addresses",
  316. metadata,
  317. Column("address_id", sa.Integer),
  318. Column(
  319. "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id)
  320. ),
  321. Column("email_address", sa.String(20)),
  322. sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"),
  323. schema=schema,
  324. test_needs_fk=True,
  325. )
  326. Table(
  327. "comment_test",
  328. metadata,
  329. Column("id", sa.Integer, primary_key=True, comment="id comment"),
  330. Column("data", sa.String(20), comment="data % comment"),
  331. Column(
  332. "d2",
  333. sa.String(20),
  334. comment=r"""Comment types type speedily ' " \ '' Fun!""",
  335. ),
  336. schema=schema,
  337. comment=r"""the test % ' " \ table comment""",
  338. )
  339. if testing.requires.cross_schema_fk_reflection.enabled:
  340. if schema is None:
  341. Table(
  342. "local_table",
  343. metadata,
  344. Column("id", sa.Integer, primary_key=True),
  345. Column("data", sa.String(20)),
  346. Column(
  347. "remote_id",
  348. ForeignKey(
  349. "%s.remote_table_2.id" % testing.config.test_schema
  350. ),
  351. ),
  352. test_needs_fk=True,
  353. schema=config.db.dialect.default_schema_name,
  354. )
  355. else:
  356. Table(
  357. "remote_table",
  358. metadata,
  359. Column("id", sa.Integer, primary_key=True),
  360. Column(
  361. "local_id",
  362. ForeignKey(
  363. "%s.local_table.id"
  364. % config.db.dialect.default_schema_name
  365. ),
  366. ),
  367. Column("data", sa.String(20)),
  368. schema=schema,
  369. test_needs_fk=True,
  370. )
  371. Table(
  372. "remote_table_2",
  373. metadata,
  374. Column("id", sa.Integer, primary_key=True),
  375. Column("data", sa.String(20)),
  376. schema=schema,
  377. test_needs_fk=True,
  378. )
  379. if testing.requires.index_reflection.enabled:
  380. cls.define_index(metadata, users)
  381. if not schema:
  382. # test_needs_fk is at the moment to force MySQL InnoDB
  383. noncol_idx_test_nopk = Table(
  384. "noncol_idx_test_nopk",
  385. metadata,
  386. Column("q", sa.String(5)),
  387. test_needs_fk=True,
  388. )
  389. noncol_idx_test_pk = Table(
  390. "noncol_idx_test_pk",
  391. metadata,
  392. Column("id", sa.Integer, primary_key=True),
  393. Column("q", sa.String(5)),
  394. test_needs_fk=True,
  395. )
  396. if testing.requires.indexes_with_ascdesc.enabled:
  397. Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc())
  398. Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc())
  399. if testing.requires.view_column_reflection.enabled:
  400. cls.define_views(metadata, schema)
  401. if not schema and testing.requires.temp_table_reflection.enabled:
  402. cls.define_temp_tables(metadata)
  403. @classmethod
  404. def define_temp_tables(cls, metadata):
  405. kw = temp_table_keyword_args(config, config.db)
  406. table_name = get_temp_table_name(
  407. config, config.db, "user_tmp_%s" % config.ident
  408. )
  409. user_tmp = Table(
  410. table_name,
  411. metadata,
  412. Column("id", sa.INT, primary_key=True),
  413. Column("name", sa.VARCHAR(50)),
  414. Column("foo", sa.INT),
  415. # disambiguate temp table unique constraint names. this is
  416. # pretty arbitrary for a generic dialect however we are doing
  417. # it to suit SQL Server which will produce name conflicts for
  418. # unique constraints created against temp tables in different
  419. # databases.
  420. # https://www.arbinada.com/en/node/1645
  421. sa.UniqueConstraint("name", name="user_tmp_uq_%s" % config.ident),
  422. sa.Index("user_tmp_ix", "foo"),
  423. **kw
  424. )
  425. if (
  426. testing.requires.view_reflection.enabled
  427. and testing.requires.temporary_views.enabled
  428. ):
  429. event.listen(
  430. user_tmp,
  431. "after_create",
  432. DDL(
  433. "create temporary view user_tmp_v as "
  434. "select * from user_tmp_%s" % config.ident
  435. ),
  436. )
  437. event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
  438. @classmethod
  439. def define_index(cls, metadata, users):
  440. Index("users_t_idx", users.c.test1, users.c.test2)
  441. Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
  442. @classmethod
  443. def define_views(cls, metadata, schema):
  444. for table_name in ("users", "email_addresses"):
  445. fullname = table_name
  446. if schema:
  447. fullname = "%s.%s" % (schema, table_name)
  448. view_name = fullname + "_v"
  449. query = "CREATE VIEW %s AS SELECT * FROM %s" % (
  450. view_name,
  451. fullname,
  452. )
  453. event.listen(metadata, "after_create", DDL(query))
  454. event.listen(
  455. metadata, "before_drop", DDL("DROP VIEW %s" % view_name)
  456. )
  457. @testing.requires.schema_reflection
  458. def test_get_schema_names(self):
  459. insp = inspect(self.bind)
  460. self.assert_(testing.config.test_schema in insp.get_schema_names())
  461. @testing.requires.schema_reflection
  462. def test_get_schema_names_w_translate_map(self, connection):
  463. """test #7300"""
  464. connection = connection.execution_options(
  465. schema_translate_map={
  466. "foo": "bar",
  467. BLANK_SCHEMA: testing.config.test_schema,
  468. }
  469. )
  470. insp = inspect(connection)
  471. self.assert_(testing.config.test_schema in insp.get_schema_names())
  472. @testing.requires.schema_reflection
  473. def test_dialect_initialize(self):
  474. engine = engines.testing_engine()
  475. inspect(engine)
  476. assert hasattr(engine.dialect, "default_schema_name")
  477. @testing.requires.schema_reflection
  478. def test_get_default_schema_name(self):
  479. insp = inspect(self.bind)
  480. eq_(insp.default_schema_name, self.bind.dialect.default_schema_name)
  481. @testing.requires.foreign_key_constraint_reflection
  482. @testing.combinations(
  483. (None, True, False, False),
  484. (None, True, False, True, testing.requires.schemas),
  485. ("foreign_key", True, False, False),
  486. (None, False, True, False),
  487. (None, False, True, True, testing.requires.schemas),
  488. (None, True, True, False),
  489. (None, True, True, True, testing.requires.schemas),
  490. argnames="order_by,include_plain,include_views,use_schema",
  491. )
  492. def test_get_table_names(
  493. self, connection, order_by, include_plain, include_views, use_schema
  494. ):
  495. if use_schema:
  496. schema = config.test_schema
  497. else:
  498. schema = None
  499. _ignore_tables = [
  500. "comment_test",
  501. "noncol_idx_test_pk",
  502. "noncol_idx_test_nopk",
  503. "local_table",
  504. "remote_table",
  505. "remote_table_2",
  506. ]
  507. insp = inspect(connection)
  508. if include_views:
  509. table_names = insp.get_view_names(schema)
  510. table_names.sort()
  511. answer = ["email_addresses_v", "users_v"]
  512. eq_(sorted(table_names), answer)
  513. if include_plain:
  514. if order_by:
  515. tables = [
  516. rec[0]
  517. for rec in insp.get_sorted_table_and_fkc_names(schema)
  518. if rec[0]
  519. ]
  520. else:
  521. tables = insp.get_table_names(schema)
  522. table_names = [t for t in tables if t not in _ignore_tables]
  523. if order_by == "foreign_key":
  524. answer = ["users", "email_addresses", "dingalings"]
  525. eq_(table_names, answer)
  526. else:
  527. answer = ["dingalings", "email_addresses", "users"]
  528. eq_(sorted(table_names), answer)
  529. @testing.requires.temp_table_names
  530. def test_get_temp_table_names(self):
  531. insp = inspect(self.bind)
  532. temp_table_names = insp.get_temp_table_names()
  533. eq_(sorted(temp_table_names), ["user_tmp_%s" % config.ident])
  534. @testing.requires.view_reflection
  535. @testing.requires.temp_table_names
  536. @testing.requires.temporary_views
  537. def test_get_temp_view_names(self):
  538. insp = inspect(self.bind)
  539. temp_table_names = insp.get_temp_view_names()
  540. eq_(sorted(temp_table_names), ["user_tmp_v"])
  541. @testing.requires.comment_reflection
  542. def test_get_comments(self):
  543. self._test_get_comments()
  544. @testing.requires.comment_reflection
  545. @testing.requires.schemas
  546. def test_get_comments_with_schema(self):
  547. self._test_get_comments(testing.config.test_schema)
  548. def _test_get_comments(self, schema=None):
  549. insp = inspect(self.bind)
  550. eq_(
  551. insp.get_table_comment("comment_test", schema=schema),
  552. {"text": r"""the test % ' " \ table comment"""},
  553. )
  554. eq_(insp.get_table_comment("users", schema=schema), {"text": None})
  555. eq_(
  556. [
  557. {"name": rec["name"], "comment": rec["comment"]}
  558. for rec in insp.get_columns("comment_test", schema=schema)
  559. ],
  560. [
  561. {"comment": "id comment", "name": "id"},
  562. {"comment": "data % comment", "name": "data"},
  563. {
  564. "comment": (
  565. r"""Comment types type speedily ' " \ '' Fun!"""
  566. ),
  567. "name": "d2",
  568. },
  569. ],
  570. )
  571. @testing.combinations(
  572. (False, False),
  573. (False, True, testing.requires.schemas),
  574. (True, False, testing.requires.view_reflection),
  575. (
  576. True,
  577. True,
  578. testing.requires.schemas + testing.requires.view_reflection,
  579. ),
  580. argnames="use_views,use_schema",
  581. )
  582. def test_get_columns(self, connection, use_views, use_schema):
  583. if use_schema:
  584. schema = config.test_schema
  585. else:
  586. schema = None
  587. users, addresses = (self.tables.users, self.tables.email_addresses)
  588. if use_views:
  589. table_names = ["users_v", "email_addresses_v"]
  590. else:
  591. table_names = ["users", "email_addresses"]
  592. insp = inspect(connection)
  593. for table_name, table in zip(table_names, (users, addresses)):
  594. schema_name = schema
  595. cols = insp.get_columns(table_name, schema=schema_name)
  596. self.assert_(len(cols) > 0, len(cols))
  597. # should be in order
  598. for i, col in enumerate(table.columns):
  599. eq_(col.name, cols[i]["name"])
  600. ctype = cols[i]["type"].__class__
  601. ctype_def = col.type
  602. if isinstance(ctype_def, sa.types.TypeEngine):
  603. ctype_def = ctype_def.__class__
  604. # Oracle returns Date for DateTime.
  605. if testing.against("oracle") and ctype_def in (
  606. sql_types.Date,
  607. sql_types.DateTime,
  608. ):
  609. ctype_def = sql_types.Date
  610. # assert that the desired type and return type share
  611. # a base within one of the generic types.
  612. self.assert_(
  613. len(
  614. set(ctype.__mro__)
  615. .intersection(ctype_def.__mro__)
  616. .intersection(
  617. [
  618. sql_types.Integer,
  619. sql_types.Numeric,
  620. sql_types.DateTime,
  621. sql_types.Date,
  622. sql_types.Time,
  623. sql_types.String,
  624. sql_types._Binary,
  625. ]
  626. )
  627. )
  628. > 0,
  629. "%s(%s), %s(%s)"
  630. % (col.name, col.type, cols[i]["name"], ctype),
  631. )
  632. if not col.primary_key:
  633. assert cols[i]["default"] is None
  634. @testing.requires.temp_table_reflection
  635. def test_get_temp_table_columns(self):
  636. table_name = get_temp_table_name(
  637. config, self.bind, "user_tmp_%s" % config.ident
  638. )
  639. user_tmp = self.tables[table_name]
  640. insp = inspect(self.bind)
  641. cols = insp.get_columns(table_name)
  642. self.assert_(len(cols) > 0, len(cols))
  643. for i, col in enumerate(user_tmp.columns):
  644. eq_(col.name, cols[i]["name"])
  645. @testing.requires.temp_table_reflection
  646. @testing.requires.view_column_reflection
  647. @testing.requires.temporary_views
  648. def test_get_temp_view_columns(self):
  649. insp = inspect(self.bind)
  650. cols = insp.get_columns("user_tmp_v")
  651. eq_([col["name"] for col in cols], ["id", "name", "foo"])
  652. @testing.combinations(
  653. (False,), (True, testing.requires.schemas), argnames="use_schema"
  654. )
  655. @testing.requires.primary_key_constraint_reflection
  656. def test_get_pk_constraint(self, connection, use_schema):
  657. if use_schema:
  658. schema = testing.config.test_schema
  659. else:
  660. schema = None
  661. users, addresses = self.tables.users, self.tables.email_addresses
  662. insp = inspect(connection)
  663. users_cons = insp.get_pk_constraint(users.name, schema=schema)
  664. users_pkeys = users_cons["constrained_columns"]
  665. eq_(users_pkeys, ["user_id"])
  666. addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
  667. addr_pkeys = addr_cons["constrained_columns"]
  668. eq_(addr_pkeys, ["address_id"])
  669. with testing.requires.reflects_pk_names.fail_if():
  670. eq_(addr_cons["name"], "email_ad_pk")
  671. @testing.combinations(
  672. (False,), (True, testing.requires.schemas), argnames="use_schema"
  673. )
  674. @testing.requires.foreign_key_constraint_reflection
  675. def test_get_foreign_keys(self, connection, use_schema):
  676. if use_schema:
  677. schema = config.test_schema
  678. else:
  679. schema = None
  680. users, addresses = (self.tables.users, self.tables.email_addresses)
  681. insp = inspect(connection)
  682. expected_schema = schema
  683. # users
  684. if testing.requires.self_referential_foreign_keys.enabled:
  685. users_fkeys = insp.get_foreign_keys(users.name, schema=schema)
  686. fkey1 = users_fkeys[0]
  687. with testing.requires.named_constraints.fail_if():
  688. eq_(fkey1["name"], "user_id_fk")
  689. eq_(fkey1["referred_schema"], expected_schema)
  690. eq_(fkey1["referred_table"], users.name)
  691. eq_(fkey1["referred_columns"], ["user_id"])
  692. if testing.requires.self_referential_foreign_keys.enabled:
  693. eq_(fkey1["constrained_columns"], ["parent_user_id"])
  694. # addresses
  695. addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema)
  696. fkey1 = addr_fkeys[0]
  697. with testing.requires.implicitly_named_constraints.fail_if():
  698. self.assert_(fkey1["name"] is not None)
  699. eq_(fkey1["referred_schema"], expected_schema)
  700. eq_(fkey1["referred_table"], users.name)
  701. eq_(fkey1["referred_columns"], ["user_id"])
  702. eq_(fkey1["constrained_columns"], ["remote_user_id"])
  703. @testing.requires.cross_schema_fk_reflection
  704. @testing.requires.schemas
  705. def test_get_inter_schema_foreign_keys(self):
  706. local_table, remote_table, remote_table_2 = self.tables(
  707. "%s.local_table" % self.bind.dialect.default_schema_name,
  708. "%s.remote_table" % testing.config.test_schema,
  709. "%s.remote_table_2" % testing.config.test_schema,
  710. )
  711. insp = inspect(self.bind)
  712. local_fkeys = insp.get_foreign_keys(local_table.name)
  713. eq_(len(local_fkeys), 1)
  714. fkey1 = local_fkeys[0]
  715. eq_(fkey1["referred_schema"], testing.config.test_schema)
  716. eq_(fkey1["referred_table"], remote_table_2.name)
  717. eq_(fkey1["referred_columns"], ["id"])
  718. eq_(fkey1["constrained_columns"], ["remote_id"])
  719. remote_fkeys = insp.get_foreign_keys(
  720. remote_table.name, schema=testing.config.test_schema
  721. )
  722. eq_(len(remote_fkeys), 1)
  723. fkey2 = remote_fkeys[0]
  724. assert fkey2["referred_schema"] in (
  725. None,
  726. self.bind.dialect.default_schema_name,
  727. )
  728. eq_(fkey2["referred_table"], local_table.name)
  729. eq_(fkey2["referred_columns"], ["id"])
  730. eq_(fkey2["constrained_columns"], ["local_id"])
  731. def _assert_insp_indexes(self, indexes, expected_indexes):
  732. index_names = [d["name"] for d in indexes]
  733. for e_index in expected_indexes:
  734. assert e_index["name"] in index_names
  735. index = indexes[index_names.index(e_index["name"])]
  736. for key in e_index:
  737. eq_(e_index[key], index[key])
  738. @testing.combinations(
  739. (False,), (True, testing.requires.schemas), argnames="use_schema"
  740. )
  741. def test_get_indexes(self, connection, use_schema):
  742. if use_schema:
  743. schema = config.test_schema
  744. else:
  745. schema = None
  746. # The database may decide to create indexes for foreign keys, etc.
  747. # so there may be more indexes than expected.
  748. insp = inspect(self.bind)
  749. indexes = insp.get_indexes("users", schema=schema)
  750. expected_indexes = [
  751. {
  752. "unique": False,
  753. "column_names": ["test1", "test2"],
  754. "name": "users_t_idx",
  755. },
  756. {
  757. "unique": False,
  758. "column_names": ["user_id", "test2", "test1"],
  759. "name": "users_all_idx",
  760. },
  761. ]
  762. self._assert_insp_indexes(indexes, expected_indexes)
  763. @testing.combinations(
  764. ("noncol_idx_test_nopk", "noncol_idx_nopk"),
  765. ("noncol_idx_test_pk", "noncol_idx_pk"),
  766. argnames="tname,ixname",
  767. )
  768. @testing.requires.index_reflection
  769. @testing.requires.indexes_with_ascdesc
  770. def test_get_noncol_index(self, connection, tname, ixname):
  771. insp = inspect(connection)
  772. indexes = insp.get_indexes(tname)
  773. # reflecting an index that has "x DESC" in it as the column.
  774. # the DB may or may not give us "x", but make sure we get the index
  775. # back, it has a name, it's connected to the table.
  776. expected_indexes = [{"unique": False, "name": ixname}]
  777. self._assert_insp_indexes(indexes, expected_indexes)
  778. t = Table(tname, MetaData(), autoload_with=connection)
  779. eq_(len(t.indexes), 1)
  780. is_(list(t.indexes)[0].table, t)
  781. eq_(list(t.indexes)[0].name, ixname)
  782. @testing.requires.temp_table_reflection
  783. @testing.requires.unique_constraint_reflection
  784. def test_get_temp_table_unique_constraints(self):
  785. insp = inspect(self.bind)
  786. reflected = insp.get_unique_constraints("user_tmp_%s" % config.ident)
  787. for refl in reflected:
  788. # Different dialects handle duplicate index and constraints
  789. # differently, so ignore this flag
  790. refl.pop("duplicates_index", None)
  791. eq_(
  792. reflected,
  793. [
  794. {
  795. "column_names": ["name"],
  796. "name": "user_tmp_uq_%s" % config.ident,
  797. }
  798. ],
  799. )
  800. @testing.requires.temp_table_reflect_indexes
  801. def test_get_temp_table_indexes(self):
  802. insp = inspect(self.bind)
  803. table_name = get_temp_table_name(
  804. config, config.db, "user_tmp_%s" % config.ident
  805. )
  806. indexes = insp.get_indexes(table_name)
  807. for ind in indexes:
  808. ind.pop("dialect_options", None)
  809. expected = [
  810. {"unique": False, "column_names": ["foo"], "name": "user_tmp_ix"}
  811. ]
  812. if testing.requires.index_reflects_included_columns.enabled:
  813. expected[0]["include_columns"] = []
  814. eq_(
  815. [idx for idx in indexes if idx["name"] == "user_tmp_ix"],
  816. expected,
  817. )
  818. @testing.combinations(
  819. (True, testing.requires.schemas), (False,), argnames="use_schema"
  820. )
  821. @testing.requires.unique_constraint_reflection
  822. def test_get_unique_constraints(self, metadata, connection, use_schema):
  823. # SQLite dialect needs to parse the names of the constraints
  824. # separately from what it gets from PRAGMA index_list(), and
  825. # then matches them up. so same set of column_names in two
  826. # constraints will confuse it. Perhaps we should no longer
  827. # bother with index_list() here since we have the whole
  828. # CREATE TABLE?
  829. if use_schema:
  830. schema = config.test_schema
  831. else:
  832. schema = None
  833. uniques = sorted(
  834. [
  835. {"name": "unique_a", "column_names": ["a"]},
  836. {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]},
  837. {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]},
  838. {"name": "unique_asc_key", "column_names": ["asc", "key"]},
  839. {"name": "i.have.dots", "column_names": ["b"]},
  840. {"name": "i have spaces", "column_names": ["c"]},
  841. ],
  842. key=operator.itemgetter("name"),
  843. )
  844. table = Table(
  845. "testtbl",
  846. metadata,
  847. Column("a", sa.String(20)),
  848. Column("b", sa.String(30)),
  849. Column("c", sa.Integer),
  850. # reserved identifiers
  851. Column("asc", sa.String(30)),
  852. Column("key", sa.String(30)),
  853. schema=schema,
  854. )
  855. for uc in uniques:
  856. table.append_constraint(
  857. sa.UniqueConstraint(*uc["column_names"], name=uc["name"])
  858. )
  859. table.create(connection)
  860. inspector = inspect(connection)
  861. reflected = sorted(
  862. inspector.get_unique_constraints("testtbl", schema=schema),
  863. key=operator.itemgetter("name"),
  864. )
  865. names_that_duplicate_index = set()
  866. for orig, refl in zip(uniques, reflected):
  867. # Different dialects handle duplicate index and constraints
  868. # differently, so ignore this flag
  869. dupe = refl.pop("duplicates_index", None)
  870. if dupe:
  871. names_that_duplicate_index.add(dupe)
  872. eq_(orig, refl)
  873. reflected_metadata = MetaData()
  874. reflected = Table(
  875. "testtbl",
  876. reflected_metadata,
  877. autoload_with=connection,
  878. schema=schema,
  879. )
  880. # test "deduplicates for index" logic. MySQL and Oracle
  881. # "unique constraints" are actually unique indexes (with possible
  882. # exception of a unique that is a dupe of another one in the case
  883. # of Oracle). make sure # they aren't duplicated.
  884. idx_names = set([idx.name for idx in reflected.indexes])
  885. uq_names = set(
  886. [
  887. uq.name
  888. for uq in reflected.constraints
  889. if isinstance(uq, sa.UniqueConstraint)
  890. ]
  891. ).difference(["unique_c_a_b"])
  892. assert not idx_names.intersection(uq_names)
  893. if names_that_duplicate_index:
  894. eq_(names_that_duplicate_index, idx_names)
  895. eq_(uq_names, set())
  896. @testing.requires.view_reflection
  897. @testing.combinations(
  898. (False,), (True, testing.requires.schemas), argnames="use_schema"
  899. )
  900. def test_get_view_definition(self, connection, use_schema):
  901. if use_schema:
  902. schema = config.test_schema
  903. else:
  904. schema = None
  905. view_name1 = "users_v"
  906. view_name2 = "email_addresses_v"
  907. insp = inspect(connection)
  908. v1 = insp.get_view_definition(view_name1, schema=schema)
  909. self.assert_(v1)
  910. v2 = insp.get_view_definition(view_name2, schema=schema)
  911. self.assert_(v2)
  912. # why is this here if it's PG specific ?
  913. @testing.combinations(
  914. ("users", False),
  915. ("users", True, testing.requires.schemas),
  916. argnames="table_name,use_schema",
  917. )
  918. @testing.only_on("postgresql", "PG specific feature")
  919. def test_get_table_oid(self, connection, table_name, use_schema):
  920. if use_schema:
  921. schema = config.test_schema
  922. else:
  923. schema = None
  924. insp = inspect(connection)
  925. oid = insp.get_table_oid(table_name, schema)
  926. self.assert_(isinstance(oid, int))
  927. @testing.requires.table_reflection
  928. def test_autoincrement_col(self):
  929. """test that 'autoincrement' is reflected according to sqla's policy.
  930. Don't mark this test as unsupported for any backend !
  931. (technically it fails with MySQL InnoDB since "id" comes before "id2")
  932. A backend is better off not returning "autoincrement" at all,
  933. instead of potentially returning "False" for an auto-incrementing
  934. primary key column.
  935. """
  936. insp = inspect(self.bind)
  937. for tname, cname in [
  938. ("users", "user_id"),
  939. ("email_addresses", "address_id"),
  940. ("dingalings", "dingaling_id"),
  941. ]:
  942. cols = insp.get_columns(tname)
  943. id_ = {c["name"]: c for c in cols}[cname]
  944. assert id_.get("autoincrement", True)
  945. class TableNoColumnsTest(fixtures.TestBase):
  946. __requires__ = ("reflect_tables_no_columns",)
  947. __backend__ = True
  948. @testing.fixture
  949. def table_no_columns(self, connection, metadata):
  950. Table("empty", metadata)
  951. metadata.create_all(connection)
  952. @testing.fixture
  953. def view_no_columns(self, connection, metadata):
  954. Table("empty", metadata)
  955. metadata.create_all(connection)
  956. Table("empty", metadata)
  957. event.listen(
  958. metadata,
  959. "after_create",
  960. DDL("CREATE VIEW empty_v AS SELECT * FROM empty"),
  961. )
  962. # for transactional DDL the transaction is rolled back before this
  963. # drop statement is invoked
  964. event.listen(
  965. metadata, "before_drop", DDL("DROP VIEW IF EXISTS empty_v")
  966. )
  967. metadata.create_all(connection)
  968. @testing.requires.reflect_tables_no_columns
  969. def test_reflect_table_no_columns(self, connection, table_no_columns):
  970. t2 = Table("empty", MetaData(), autoload_with=connection)
  971. eq_(list(t2.c), [])
  972. @testing.requires.reflect_tables_no_columns
  973. def test_get_columns_table_no_columns(self, connection, table_no_columns):
  974. eq_(inspect(connection).get_columns("empty"), [])
  975. @testing.requires.reflect_tables_no_columns
  976. def test_reflect_incl_table_no_columns(self, connection, table_no_columns):
  977. m = MetaData()
  978. m.reflect(connection)
  979. assert set(m.tables).intersection(["empty"])
  980. @testing.requires.views
  981. @testing.requires.reflect_tables_no_columns
  982. def test_reflect_view_no_columns(self, connection, view_no_columns):
  983. t2 = Table("empty_v", MetaData(), autoload_with=connection)
  984. eq_(list(t2.c), [])
  985. @testing.requires.views
  986. @testing.requires.reflect_tables_no_columns
  987. def test_get_columns_view_no_columns(self, connection, view_no_columns):
  988. eq_(inspect(connection).get_columns("empty_v"), [])
  989. class ComponentReflectionTestExtra(fixtures.TestBase):
  990. __backend__ = True
  991. @testing.combinations(
  992. (True, testing.requires.schemas), (False,), argnames="use_schema"
  993. )
  994. @testing.requires.check_constraint_reflection
  995. def test_get_check_constraints(self, metadata, connection, use_schema):
  996. if use_schema:
  997. schema = config.test_schema
  998. else:
  999. schema = None
  1000. Table(
  1001. "sa_cc",
  1002. metadata,
  1003. Column("a", Integer()),
  1004. sa.CheckConstraint("a > 1 AND a < 5", name="cc1"),
  1005. sa.CheckConstraint("a = 1 OR (a > 2 AND a < 5)", name="cc2"),
  1006. schema=schema,
  1007. )
  1008. metadata.create_all(connection)
  1009. inspector = inspect(connection)
  1010. reflected = sorted(
  1011. inspector.get_check_constraints("sa_cc", schema=schema),
  1012. key=operator.itemgetter("name"),
  1013. )
  1014. # trying to minimize effect of quoting, parenthesis, etc.
  1015. # may need to add more to this as new dialects get CHECK
  1016. # constraint reflection support
  1017. def normalize(sqltext):
  1018. return " ".join(
  1019. re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I)
  1020. )
  1021. reflected = [
  1022. {"name": item["name"], "sqltext": normalize(item["sqltext"])}
  1023. for item in reflected
  1024. ]
  1025. eq_(
  1026. reflected,
  1027. [
  1028. {"name": "cc1", "sqltext": "a > 1 and a < 5"},
  1029. {"name": "cc2", "sqltext": "a = 1 or a > 2 and a < 5"},
  1030. ],
  1031. )
  1032. @testing.requires.indexes_with_expressions
  1033. def test_reflect_expression_based_indexes(self, metadata, connection):
  1034. t = Table(
  1035. "t",
  1036. metadata,
  1037. Column("x", String(30)),
  1038. Column("y", String(30)),
  1039. )
  1040. Index("t_idx", func.lower(t.c.x), func.lower(t.c.y))
  1041. Index("t_idx_2", t.c.x)
  1042. metadata.create_all(connection)
  1043. insp = inspect(connection)
  1044. expected = [
  1045. {"name": "t_idx_2", "column_names": ["x"], "unique": False}
  1046. ]
  1047. if testing.requires.index_reflects_included_columns.enabled:
  1048. expected[0]["include_columns"] = []
  1049. expected[0]["dialect_options"] = {
  1050. "%s_include" % connection.engine.name: []
  1051. }
  1052. with expect_warnings(
  1053. "Skipped unsupported reflection of expression-based index t_idx"
  1054. ):
  1055. eq_(
  1056. insp.get_indexes("t"),
  1057. expected,
  1058. )
  1059. @testing.requires.index_reflects_included_columns
  1060. def test_reflect_covering_index(self, metadata, connection):
  1061. t = Table(
  1062. "t",
  1063. metadata,
  1064. Column("x", String(30)),
  1065. Column("y", String(30)),
  1066. )
  1067. idx = Index("t_idx", t.c.x)
  1068. idx.dialect_options[connection.engine.name]["include"] = ["y"]
  1069. metadata.create_all(connection)
  1070. insp = inspect(connection)
  1071. eq_(
  1072. insp.get_indexes("t"),
  1073. [
  1074. {
  1075. "name": "t_idx",
  1076. "column_names": ["x"],
  1077. "include_columns": ["y"],
  1078. "unique": False,
  1079. "dialect_options": {
  1080. "%s_include" % connection.engine.name: ["y"]
  1081. },
  1082. }
  1083. ],
  1084. )
  1085. t2 = Table("t", MetaData(), autoload_with=connection)
  1086. eq_(
  1087. list(t2.indexes)[0].dialect_options[connection.engine.name][
  1088. "include"
  1089. ],
  1090. ["y"],
  1091. )
  1092. def _type_round_trip(self, connection, metadata, *types):
  1093. t = Table(
  1094. "t",
  1095. metadata,
  1096. *[Column("t%d" % i, type_) for i, type_ in enumerate(types)]
  1097. )
  1098. t.create(connection)
  1099. return [c["type"] for c in inspect(connection).get_columns("t")]
  1100. @testing.requires.table_reflection
  1101. def test_numeric_reflection(self, connection, metadata):
  1102. for typ in self._type_round_trip(
  1103. connection, metadata, sql_types.Numeric(18, 5)
  1104. ):
  1105. assert isinstance(typ, sql_types.Numeric)
  1106. eq_(typ.precision, 18)
  1107. eq_(typ.scale, 5)
  1108. @testing.requires.table_reflection
  1109. def test_varchar_reflection(self, connection, metadata):
  1110. typ = self._type_round_trip(
  1111. connection, metadata, sql_types.String(52)
  1112. )[0]
  1113. assert isinstance(typ, sql_types.String)
  1114. eq_(typ.length, 52)
  1115. @testing.requires.table_reflection
  1116. def test_nullable_reflection(self, connection, metadata):
  1117. t = Table(
  1118. "t",
  1119. metadata,
  1120. Column("a", Integer, nullable=True),
  1121. Column("b", Integer, nullable=False),
  1122. )
  1123. t.create(connection)
  1124. eq_(
  1125. dict(
  1126. (col["name"], col["nullable"])
  1127. for col in inspect(connection).get_columns("t")
  1128. ),
  1129. {"a": True, "b": False},
  1130. )
  1131. @testing.combinations(
  1132. (
  1133. None,
  1134. "CASCADE",
  1135. None,
  1136. testing.requires.foreign_key_constraint_option_reflection_ondelete,
  1137. ),
  1138. (
  1139. None,
  1140. None,
  1141. "SET NULL",
  1142. testing.requires.foreign_key_constraint_option_reflection_onupdate,
  1143. ),
  1144. (
  1145. {},
  1146. None,
  1147. "NO ACTION",
  1148. testing.requires.foreign_key_constraint_option_reflection_onupdate,
  1149. ),
  1150. (
  1151. {},
  1152. "NO ACTION",
  1153. None,
  1154. testing.requires.fk_constraint_option_reflection_ondelete_noaction,
  1155. ),
  1156. (
  1157. None,
  1158. None,
  1159. "RESTRICT",
  1160. testing.requires.fk_constraint_option_reflection_onupdate_restrict,
  1161. ),
  1162. (
  1163. None,
  1164. "RESTRICT",
  1165. None,
  1166. testing.requires.fk_constraint_option_reflection_ondelete_restrict,
  1167. ),
  1168. argnames="expected,ondelete,onupdate",
  1169. )
  1170. def test_get_foreign_key_options(
  1171. self, connection, metadata, expected, ondelete, onupdate
  1172. ):
  1173. options = {}
  1174. if ondelete:
  1175. options["ondelete"] = ondelete
  1176. if onupdate:
  1177. options["onupdate"] = onupdate
  1178. if expected is None:
  1179. expected = options
  1180. Table(
  1181. "x",
  1182. metadata,
  1183. Column("id", Integer, primary_key=True),
  1184. test_needs_fk=True,
  1185. )
  1186. Table(
  1187. "table",
  1188. metadata,
  1189. Column("id", Integer, primary_key=True),
  1190. Column("x_id", Integer, sa.ForeignKey("x.id", name="xid")),
  1191. Column("test", String(10)),
  1192. test_needs_fk=True,
  1193. )
  1194. Table(
  1195. "user",
  1196. metadata,
  1197. Column("id", Integer, primary_key=True),
  1198. Column("name", String(50), nullable=False),
  1199. Column("tid", Integer),
  1200. sa.ForeignKeyConstraint(
  1201. ["tid"], ["table.id"], name="myfk", **options
  1202. ),
  1203. test_needs_fk=True,
  1204. )
  1205. metadata.create_all(connection)
  1206. insp = inspect(connection)
  1207. # test 'options' is always present for a backend
  1208. # that can reflect these, since alembic looks for this
  1209. opts = insp.get_foreign_keys("table")[0]["options"]
  1210. eq_(dict((k, opts[k]) for k in opts if opts[k]), {})
  1211. opts = insp.get_foreign_keys("user")[0]["options"]
  1212. eq_(opts, expected)
  1213. # eq_(dict((k, opts[k]) for k in opts if opts[k]), expected)
  1214. class NormalizedNameTest(fixtures.TablesTest):
  1215. __requires__ = ("denormalized_names",)
  1216. __backend__ = True
  1217. @classmethod
  1218. def define_tables(cls, metadata):
  1219. Table(
  1220. quoted_name("t1", quote=True),
  1221. metadata,
  1222. Column("id", Integer, primary_key=True),
  1223. )
  1224. Table(
  1225. quoted_name("t2", quote=True),
  1226. metadata,
  1227. Column("id", Integer, primary_key=True),
  1228. Column("t1id", ForeignKey("t1.id")),
  1229. )
  1230. def test_reflect_lowercase_forced_tables(self):
  1231. m2 = MetaData()
  1232. t2_ref = Table(
  1233. quoted_name("t2", quote=True), m2, autoload_with=config.db
  1234. )
  1235. t1_ref = m2.tables["t1"]
  1236. assert t2_ref.c.t1id.references(t1_ref.c.id)
  1237. m3 = MetaData()
  1238. m3.reflect(
  1239. config.db, only=lambda name, m: name.lower() in ("t1", "t2")
  1240. )
  1241. assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id)
  1242. def test_get_table_names(self):
  1243. tablenames = [
  1244. t
  1245. for t in inspect(config.db).get_table_names()
  1246. if t.lower() in ("t1", "t2")
  1247. ]
  1248. eq_(tablenames[0].upper(), tablenames[0].lower())
  1249. eq_(tablenames[1].upper(), tablenames[1].lower())
  1250. class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
  1251. def test_computed_col_default_not_set(self):
  1252. insp = inspect(config.db)
  1253. cols = insp.get_columns("computed_default_table")
  1254. col_data = {c["name"]: c for c in cols}
  1255. is_true("42" in col_data["with_default"]["default"])
  1256. is_(col_data["normal"]["default"], None)
  1257. is_(col_data["computed_col"]["default"], None)
  1258. def test_get_column_returns_computed(self):
  1259. insp = inspect(config.db)
  1260. cols = insp.get_columns("computed_default_table")
  1261. data = {c["name"]: c for c in cols}
  1262. for key in ("id", "normal", "with_default"):
  1263. is_true("computed" not in data[key])
  1264. compData = data["computed_col"]
  1265. is_true("computed" in compData)
  1266. is_true("sqltext" in compData["computed"])
  1267. eq_(self.normalize(compData["computed"]["sqltext"]), "normal+42")
  1268. eq_(
  1269. "persisted" in compData["computed"],
  1270. testing.requires.computed_columns_reflect_persisted.enabled,
  1271. )
  1272. if testing.requires.computed_columns_reflect_persisted.enabled:
  1273. eq_(
  1274. compData["computed"]["persisted"],
  1275. testing.requires.computed_columns_default_persisted.enabled,
  1276. )
  1277. def check_column(self, data, column, sqltext, persisted):
  1278. is_true("computed" in data[column])
  1279. compData = data[column]["computed"]
  1280. eq_(self.normalize(compData["sqltext"]), sqltext)
  1281. if testing.requires.computed_columns_reflect_persisted.enabled:
  1282. is_true("persisted" in compData)
  1283. is_(compData["persisted"], persisted)
  1284. def test_get_column_returns_persisted(self):
  1285. insp = inspect(config.db)
  1286. cols = insp.get_columns("computed_column_table")
  1287. data = {c["name"]: c for c in cols}
  1288. self.check_column(
  1289. data,
  1290. "computed_no_flag",
  1291. "normal+42",
  1292. testing.requires.computed_columns_default_persisted.enabled,
  1293. )
  1294. if testing.requires.computed_columns_virtual.enabled:
  1295. self.check_column(
  1296. data,
  1297. "computed_virtual",
  1298. "normal+2",
  1299. False,
  1300. )
  1301. if testing.requires.computed_columns_stored.enabled:
  1302. self.check_column(
  1303. data,
  1304. "computed_stored",
  1305. "normal-42",
  1306. True,
  1307. )
  1308. @testing.requires.schemas
  1309. def test_get_column_returns_persisted_with_schema(self):
  1310. insp = inspect(config.db)
  1311. cols = insp.get_columns(
  1312. "computed_column_table", schema=config.test_schema
  1313. )
  1314. data = {c["name"]: c for c in cols}
  1315. self.check_column(
  1316. data,
  1317. "computed_no_flag",
  1318. "normal/42",
  1319. testing.requires.computed_columns_default_persisted.enabled,
  1320. )
  1321. if testing.requires.computed_columns_virtual.enabled:
  1322. self.check_column(
  1323. data,
  1324. "computed_virtual",
  1325. "normal/2",
  1326. False,
  1327. )
  1328. if testing.requires.computed_columns_stored.enabled:
  1329. self.check_column(
  1330. data,
  1331. "computed_stored",
  1332. "normal*42",
  1333. True,
  1334. )
  1335. class IdentityReflectionTest(fixtures.TablesTest):
  1336. run_inserts = run_deletes = None
  1337. __backend__ = True
  1338. __requires__ = ("identity_columns", "table_reflection")
  1339. @classmethod
  1340. def define_tables(cls, metadata):
  1341. Table(
  1342. "t1",
  1343. metadata,
  1344. Column("normal", Integer),
  1345. Column("id1", Integer, Identity()),
  1346. )
  1347. Table(
  1348. "t2",
  1349. metadata,
  1350. Column(
  1351. "id2",
  1352. Integer,
  1353. Identity(
  1354. always=True,
  1355. start=2,
  1356. increment=3,
  1357. minvalue=-2,
  1358. maxvalue=42,
  1359. cycle=True,
  1360. cache=4,
  1361. ),
  1362. ),
  1363. )
  1364. if testing.requires.schemas.enabled:
  1365. Table(
  1366. "t1",
  1367. metadata,
  1368. Column("normal", Integer),
  1369. Column("id1", Integer, Identity(always=True, start=20)),
  1370. schema=config.test_schema,
  1371. )
  1372. def check(self, value, exp, approx):
  1373. if testing.requires.identity_columns_standard.enabled:
  1374. common_keys = (
  1375. "always",
  1376. "start",
  1377. "increment",
  1378. "minvalue",
  1379. "maxvalue",
  1380. "cycle",
  1381. "cache",
  1382. )
  1383. for k in list(value):
  1384. if k not in common_keys:
  1385. value.pop(k)
  1386. if approx:
  1387. eq_(len(value), len(exp))
  1388. for k in value:
  1389. if k == "minvalue":
  1390. is_true(value[k] <= exp[k])
  1391. elif k in {"maxvalue", "cache"}:
  1392. is_true(value[k] >= exp[k])
  1393. else:
  1394. eq_(value[k], exp[k], k)
  1395. else:
  1396. eq_(value, exp)
  1397. else:
  1398. eq_(value["start"], exp["start"])
  1399. eq_(value["increment"], exp["increment"])
  1400. def test_reflect_identity(self):
  1401. insp = inspect(config.db)
  1402. cols = insp.get_columns("t1") + insp.get_columns("t2")
  1403. for col in cols:
  1404. if col["name"] == "normal":
  1405. is_false("identity" in col)
  1406. elif col["name"] == "id1":
  1407. is_true(col["autoincrement"] in (True, "auto"))
  1408. eq_(col["default"], None)
  1409. is_true("identity" in col)
  1410. self.check(
  1411. col["identity"],
  1412. dict(
  1413. always=False,
  1414. start=1,
  1415. increment=1,
  1416. minvalue=1,
  1417. maxvalue=2147483647,
  1418. cycle=False,
  1419. cache=1,
  1420. ),
  1421. approx=True,
  1422. )
  1423. elif col["name"] == "id2":
  1424. is_true(col["autoincrement"] in (True, "auto"))
  1425. eq_(col["default"], None)
  1426. is_true("identity" in col)
  1427. self.check(
  1428. col["identity"],
  1429. dict(
  1430. always=True,
  1431. start=2,
  1432. increment=3,
  1433. minvalue=-2,
  1434. maxvalue=42,
  1435. cycle=True,
  1436. cache=4,
  1437. ),
  1438. approx=False,
  1439. )
  1440. @testing.requires.schemas
  1441. def test_reflect_identity_schema(self):
  1442. insp = inspect(config.db)
  1443. cols = insp.get_columns("t1", schema=config.test_schema)
  1444. for col in cols:
  1445. if col["name"] == "normal":
  1446. is_false("identity" in col)
  1447. elif col["name"] == "id1":
  1448. is_true(col["autoincrement"] in (True, "auto"))
  1449. eq_(col["default"], None)
  1450. is_true("identity" in col)
  1451. self.check(
  1452. col["identity"],
  1453. dict(
  1454. always=True,
  1455. start=20,
  1456. increment=1,
  1457. minvalue=1,
  1458. maxvalue=2147483647,
  1459. cycle=False,
  1460. cache=1,
  1461. ),
  1462. approx=True,
  1463. )
  1464. class CompositeKeyReflectionTest(fixtures.TablesTest):
  1465. __backend__ = True
  1466. @classmethod
  1467. def define_tables(cls, metadata):
  1468. tb1 = Table(
  1469. "tb1",
  1470. metadata,
  1471. Column("id", Integer),
  1472. Column("attr", Integer),
  1473. Column("name", sql_types.VARCHAR(20)),
  1474. sa.PrimaryKeyConstraint("name", "id", "attr", name="pk_tb1"),
  1475. schema=None,
  1476. test_needs_fk=True,
  1477. )
  1478. Table(
  1479. "tb2",
  1480. metadata,
  1481. Column("id", Integer, primary_key=True),
  1482. Column("pid", Integer),
  1483. Column("pattr", Integer),
  1484. Column("pname", sql_types.VARCHAR(20)),
  1485. sa.ForeignKeyConstraint(
  1486. ["pname", "pid", "pattr"],
  1487. [tb1.c.name, tb1.c.id, tb1.c.attr],
  1488. name="fk_tb1_name_id_attr",
  1489. ),
  1490. schema=None,
  1491. test_needs_fk=True,
  1492. )
  1493. @testing.requires.primary_key_constraint_reflection
  1494. def test_pk_column_order(self):
  1495. # test for issue #5661
  1496. insp = inspect(self.bind)
  1497. primary_key = insp.get_pk_constraint(self.tables.tb1.name)
  1498. eq_(primary_key.get("constrained_columns"), ["name", "id", "attr"])
  1499. @testing.requires.foreign_key_constraint_reflection
  1500. def test_fk_column_order(self):
  1501. # test for issue #5661
  1502. insp = inspect(self.bind)
  1503. foreign_keys = insp.get_foreign_keys(self.tables.tb2.name)
  1504. eq_(len(foreign_keys), 1)
  1505. fkey1 = foreign_keys[0]
  1506. eq_(fkey1.get("referred_columns"), ["name", "id", "attr"])
  1507. eq_(fkey1.get("constrained_columns"), ["pname", "pid", "pattr"])
  1508. __all__ = (
  1509. "ComponentReflectionTest",
  1510. "ComponentReflectionTestExtra",
  1511. "TableNoColumnsTest",
  1512. "QuotedNameArgumentTest",
  1513. "HasTableTest",
  1514. "HasIndexTest",
  1515. "NormalizedNameTest",
  1516. "ComputedReflectionTest",
  1517. "IdentityReflectionTest",
  1518. "CompositeKeyReflectionTest",
  1519. )