1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519 |
- # postgresql/base.py
- # Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
- # <see AUTHORS file>
- #
- # This module is part of SQLAlchemy and is released under
- # the MIT License: https://www.opensource.org/licenses/mit-license.php
- r"""
- .. dialect:: postgresql
- :name: PostgreSQL
- :full_support: 9.6, 10, 11, 12, 13
- :normal_support: 9.6+
- :best_effort: 8+
- .. _postgresql_sequences:
- Sequences/SERIAL/IDENTITY
- -------------------------
- PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
- of creating new primary key values for integer-based primary key columns. When
- creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
- integer-based primary key columns, which generates a sequence and server side
- default corresponding to the column.
- To specify a specific named sequence to be used for primary key generation,
- use the :func:`~sqlalchemy.schema.Sequence` construct::
- Table('sometable', metadata,
- Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
- )
- When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
- having the "last insert identifier" available, a RETURNING clause is added to
- the INSERT statement which specifies the primary key columns should be
- returned after the statement completes. The RETURNING functionality only takes
- place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
- sequence, whether specified explicitly or implicitly via ``SERIAL``, is
- executed independently beforehand, the returned value to be used in the
- subsequent insert. Note that when an
- :func:`~sqlalchemy.sql.expression.insert()` construct is executed using
- "executemany" semantics, the "last inserted identifier" functionality does not
- apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
- case.
- To force the usage of RETURNING by default off, specify the flag
- ``implicit_returning=False`` to :func:`_sa.create_engine`.
- PostgreSQL 10 and above IDENTITY columns
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
- of SERIAL. The :class:`_schema.Identity` construct in a
- :class:`_schema.Column` can be used to control its behavior::
- from sqlalchemy import Table, Column, MetaData, Integer, Computed
- metadata = MetaData()
- data = Table(
- "data",
- metadata,
- Column(
- 'id', Integer, Identity(start=42, cycle=True), primary_key=True
- ),
- Column('data', String)
- )
- The CREATE TABLE for the above :class:`_schema.Table` object would be:
- .. sourcecode:: sql
- CREATE TABLE data (
- id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
- data VARCHAR,
- PRIMARY KEY (id)
- )
- .. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
- in a :class:`_schema.Column` to specify the option of an autoincrementing
- column.
- .. note::
- Previous versions of SQLAlchemy did not have built-in support for rendering
- of IDENTITY, and could use the following compilation hook to replace
- occurrences of SERIAL with IDENTITY::
- from sqlalchemy.schema import CreateColumn
- from sqlalchemy.ext.compiler import compiles
- @compiles(CreateColumn, 'postgresql')
- def use_identity(element, compiler, **kw):
- text = compiler.visit_create_column(element, **kw)
- text = text.replace(
- "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY"
- )
- return text
- Using the above, a table such as::
- t = Table(
- 't', m,
- Column('id', Integer, primary_key=True),
- Column('data', String)
- )
- Will generate on the backing database as::
- CREATE TABLE t (
- id INT GENERATED BY DEFAULT AS IDENTITY,
- data VARCHAR,
- PRIMARY KEY (id)
- )
- .. _postgresql_ss_cursors:
- Server Side Cursors
- -------------------
- Server-side cursor support is available for the psycopg2, asyncpg
- dialects and may also be available in others.
- Server side cursors are enabled on a per-statement basis by using the
- :paramref:`.Connection.execution_options.stream_results` connection execution
- option::
- with engine.connect() as conn:
- result = conn.execution_options(stream_results=True).execute(text("select * from table"))
- Note that some kinds of SQL statements may not be supported with
- server side cursors; generally, only SQL statements that return rows should be
- used with this option.
- .. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
- and will be removed in a future release. Please use the
- :paramref:`_engine.Connection.stream_results` execution option for
- unbuffered cursor support.
- .. seealso::
- :ref:`engine_stream_results`
- .. _postgresql_isolation_level:
- Transaction Isolation Level
- ---------------------------
- Most SQLAlchemy dialects support setting of transaction isolation level
- using the :paramref:`_sa.create_engine.execution_options` parameter
- at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
- level via the :paramref:`.Connection.execution_options.isolation_level`
- parameter.
- For PostgreSQL dialects, this feature works either by making use of the
- DBAPI-specific features, such as psycopg2's isolation level flags which will
- embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
- DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
- TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
- emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
- DBAPI-specific techniques are used which is typically an ``.autocommit``
- flag on the DBAPI connection object.
- To set isolation level using :func:`_sa.create_engine`::
- engine = create_engine(
- "postgresql+pg8000://scott:tiger@localhost/test",
- execution_options={
- "isolation_level": "REPEATABLE READ"
- }
- )
- To set using per-connection execution options::
- with engine.connect() as conn:
- conn = conn.execution_options(
- isolation_level="REPEATABLE READ"
- )
- with conn.begin():
- # ... work with transaction
- Valid values for ``isolation_level`` on most PostgreSQL dialects include:
- * ``READ COMMITTED``
- * ``READ UNCOMMITTED``
- * ``REPEATABLE READ``
- * ``SERIALIZABLE``
- * ``AUTOCOMMIT``
- .. seealso::
- :ref:`postgresql_readonly_deferrable`
- :ref:`dbapi_autocommit`
- :ref:`psycopg2_isolation_level`
- :ref:`pg8000_isolation_level`
- .. _postgresql_readonly_deferrable:
- Setting READ ONLY / DEFERRABLE
- ------------------------------
- Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
- characteristics of the transaction, which is in addition to the isolation level
- setting. These two attributes can be established either in conjunction with or
- independently of the isolation level by passing the ``postgresql_readonly`` and
- ``postgresql_deferrable`` flags with
- :meth:`_engine.Connection.execution_options`. The example below illustrates
- passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
- "READ ONLY" and "DEFERRABLE"::
- with engine.connect() as conn:
- conn = conn.execution_options(
- isolation_level="SERIALIZABLE",
- postgresql_readonly=True,
- postgresql_deferrable=True
- )
- with conn.begin():
- # ... work with transaction
- Note that some DBAPIs such as asyncpg only support "readonly" with
- SERIALIZABLE isolation.
- .. versionadded:: 1.4 added support for the ``postgresql_readonly``
- and ``postgresql_deferrable`` execution options.
- .. _postgresql_alternate_search_path:
- Setting Alternate Search Paths on Connect
- ------------------------------------------
- The PostgreSQL ``search_path`` variable refers to the list of schema names
- that will be implicitly referred towards when a particular table or other
- object is referenced in a SQL statement. As detailed in the next section
- :ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
- the concept of keeping this variable at its default value of ``public``,
- however, in order to have it set to any arbitrary name or names when connections
- are used automatically, the "SET SESSION search_path" command may be invoked
- for all connections in a pool using the following event handler, as discussed
- at :ref:`schema_set_default_connections`::
- from sqlalchemy import event
- from sqlalchemy import create_engine
- engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
- @event.listens_for(engine, "connect", insert=True)
- def set_search_path(dbapi_connection, connection_record):
- existing_autocommit = dbapi_connection.autocommit
- dbapi_connection.autocommit = True
- cursor = dbapi_connection.cursor()
- cursor.execute("SET SESSION search_path='%s'" % schema_name)
- cursor.close()
- dbapi_connection.autocommit = existing_autocommit
- The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
- attribute is so that when the ``SET SESSION search_path`` directive is invoked,
- it is invoked outside of the scope of any transaction and therefore will not
- be reverted when the DBAPI connection has a rollback.
- .. seealso::
- :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
- .. _postgresql_schema_reflection:
- Remote-Schema Table Introspection and PostgreSQL search_path
- ------------------------------------------------------------
- .. admonition:: Section Best Practices Summarized
- keep the ``search_path`` variable set to its default of ``public``, without
- any other schema names. For other schema names, name these explicitly
- within :class:`_schema.Table` definitions. Alternatively, the
- ``postgresql_ignore_search_path`` option will cause all reflected
- :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
- attribute set up.
- The PostgreSQL dialect can reflect tables from any schema, as outlined in
- :ref:`schema_table_reflection`.
- With regards to tables which these :class:`_schema.Table`
- objects refer to via foreign key constraint, a decision must be made as to how
- the ``.schema`` is represented in those remote tables, in the case where that
- remote schema name is also a member of the current
- `PostgreSQL search path
- <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_.
- By default, the PostgreSQL dialect mimics the behavior encouraged by
- PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
- returns a sample definition for a particular foreign key constraint,
- omitting the referenced schema name from that definition when the name is
- also in the PostgreSQL schema search path. The interaction below
- illustrates this behavior::
- test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
- CREATE TABLE
- test=> CREATE TABLE referring(
- test(> id INTEGER PRIMARY KEY,
- test(> referred_id INTEGER REFERENCES test_schema.referred(id));
- CREATE TABLE
- test=> SET search_path TO public, test_schema;
- test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
- test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
- test-> ON n.oid = c.relnamespace
- test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
- test-> WHERE c.relname='referring' AND r.contype = 'f'
- test-> ;
- pg_get_constraintdef
- ---------------------------------------------------
- FOREIGN KEY (referred_id) REFERENCES referred(id)
- (1 row)
- Above, we created a table ``referred`` as a member of the remote schema
- ``test_schema``, however when we added ``test_schema`` to the
- PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
- ``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
- the function.
- On the other hand, if we set the search path back to the typical default
- of ``public``::
- test=> SET search_path TO public;
- SET
- The same query against ``pg_get_constraintdef()`` now returns the fully
- schema-qualified name for us::
- test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
- test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
- test-> ON n.oid = c.relnamespace
- test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
- test-> WHERE c.relname='referring' AND r.contype = 'f';
- pg_get_constraintdef
- ---------------------------------------------------------------
- FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
- (1 row)
- SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
- in order to determine the remote schema name. That is, if our ``search_path``
- were set to include ``test_schema``, and we invoked a table
- reflection process as follows::
- >>> from sqlalchemy import Table, MetaData, create_engine, text
- >>> engine = create_engine("postgresql://scott:tiger@localhost/test")
- >>> with engine.connect() as conn:
- ... conn.execute(text("SET search_path TO test_schema, public"))
- ... metadata_obj = MetaData()
- ... referring = Table('referring', metadata_obj,
- ... autoload_with=conn)
- ...
- <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
- The above process would deliver to the :attr:`_schema.MetaData.tables`
- collection
- ``referred`` table named **without** the schema::
- >>> metadata_obj.tables['referred'].schema is None
- True
- To alter the behavior of reflection such that the referred schema is
- maintained regardless of the ``search_path`` setting, use the
- ``postgresql_ignore_search_path`` option, which can be specified as a
- dialect-specific argument to both :class:`_schema.Table` as well as
- :meth:`_schema.MetaData.reflect`::
- >>> with engine.connect() as conn:
- ... conn.execute(text("SET search_path TO test_schema, public"))
- ... metadata_obj = MetaData()
- ... referring = Table('referring', metadata_obj,
- ... autoload_with=conn,
- ... postgresql_ignore_search_path=True)
- ...
- <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
- We will now have ``test_schema.referred`` stored as schema-qualified::
- >>> metadata_obj.tables['test_schema.referred'].schema
- 'test_schema'
- .. sidebar:: Best Practices for PostgreSQL Schema reflection
- The description of PostgreSQL schema reflection behavior is complex, and
- is the product of many years of dealing with widely varied use cases and
- user preferences. But in fact, there's no need to understand any of it if
- you just stick to the simplest use pattern: leave the ``search_path`` set
- to its default of ``public`` only, never refer to the name ``public`` as
- an explicit schema name otherwise, and refer to all other schema names
- explicitly when building up a :class:`_schema.Table` object. The options
- described here are only for those users who can't, or prefer not to, stay
- within these guidelines.
- Note that **in all cases**, the "default" schema is always reflected as
- ``None``. The "default" schema on PostgreSQL is that which is returned by the
- PostgreSQL ``current_schema()`` function. On a typical PostgreSQL
- installation, this is the name ``public``. So a table that refers to another
- which is in the ``public`` (i.e. default) schema will always have the
- ``.schema`` attribute set to ``None``.
- .. seealso::
- :ref:`reflection_schema_qualified_interaction` - discussion of the issue
- from a backend-agnostic perspective
- `The Schema Search Path
- <https://www.postgresql.org/docs/9.0/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
- - on the PostgreSQL website.
- INSERT/UPDATE...RETURNING
- -------------------------
- The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
- ``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
- for single-row INSERT statements in order to fetch newly generated
- primary key identifiers. To specify an explicit ``RETURNING`` clause,
- use the :meth:`._UpdateBase.returning` method on a per-statement basis::
- # INSERT..RETURNING
- result = table.insert().returning(table.c.col1, table.c.col2).\
- values(name='foo')
- print(result.fetchall())
- # UPDATE..RETURNING
- result = table.update().returning(table.c.col1, table.c.col2).\
- where(table.c.name=='foo').values(name='bar')
- print(result.fetchall())
- # DELETE..RETURNING
- result = table.delete().returning(table.c.col1, table.c.col2).\
- where(table.c.name=='foo')
- print(result.fetchall())
- .. _postgresql_insert_on_conflict:
- INSERT...ON CONFLICT (Upsert)
- ------------------------------
- Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
- rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
- candidate row will only be inserted if that row does not violate any unique
- constraints. In the case of a unique constraint violation, a secondary action
- can occur which can be either "DO UPDATE", indicating that the data in the
- target row should be updated, or "DO NOTHING", which indicates to silently skip
- this row.
- Conflicts are determined using existing unique constraints and indexes. These
- constraints may be identified either using their name as stated in DDL,
- or they may be inferred by stating the columns and conditions that comprise
- the indexes.
- SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
- :func:`_postgresql.insert()` function, which provides
- the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
- and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
- .. sourcecode:: pycon+sql
- >>> from sqlalchemy.dialects.postgresql import insert
- >>> insert_stmt = insert(my_table).values(
- ... id='some_existing_id',
- ... data='inserted value')
- >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
- ... index_elements=['id']
- ... )
- >>> print(do_nothing_stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO NOTHING
- {stop}
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... constraint='pk_my_table',
- ... set_=dict(data='updated value')
- ... )
- >>> print(do_update_stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
- .. versionadded:: 1.1
- .. seealso::
- `INSERT .. ON CONFLICT
- <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
- - in the PostgreSQL documentation.
- Specifying the Target
- ^^^^^^^^^^^^^^^^^^^^^
- Both methods supply the "target" of the conflict using either the
- named constraint or by column inference:
- * The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
- specifies a sequence containing string column names, :class:`_schema.Column`
- objects, and/or SQL expression elements, which would identify a unique
- index:
- .. sourcecode:: pycon+sql
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... index_elements=['id'],
- ... set_=dict(data='updated value')
- ... )
- >>> print(do_update_stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
- {stop}
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... index_elements=[my_table.c.id],
- ... set_=dict(data='updated value')
- ... )
- >>> print(do_update_stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
- * When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
- infer an index, a partial index can be inferred by also specifying the
- use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
- >>> stmt = stmt.on_conflict_do_update(
- ... index_elements=[my_table.c.user_email],
- ... index_where=my_table.c.user_email.like('%@gmail.com'),
- ... set_=dict(data=stmt.excluded.data)
- ... )
- >>> print(stmt)
- {opensql}INSERT INTO my_table (data, user_email)
- VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
- WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
- * The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
- used to specify an index directly rather than inferring it. This can be
- the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
- .. sourcecode:: pycon+sql
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... constraint='my_table_idx_1',
- ... set_=dict(data='updated value')
- ... )
- >>> print(do_update_stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
- {stop}
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... constraint='my_table_pk',
- ... set_=dict(data='updated value')
- ... )
- >>> print(do_update_stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
- {stop}
- * The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
- also refer to a SQLAlchemy construct representing a constraint,
- e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
- :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
- if the constraint has a name, it is used directly. Otherwise, if the
- constraint is unnamed, then inference will be used, where the expressions
- and optional WHERE clause of the constraint will be spelled out in the
- construct. This use is especially convenient
- to refer to the named or unnamed primary key of a :class:`_schema.Table`
- using the
- :attr:`_schema.Table.primary_key` attribute:
- .. sourcecode:: pycon+sql
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... constraint=my_table.primary_key,
- ... set_=dict(data='updated value')
- ... )
- >>> print(do_update_stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
- The SET Clause
- ^^^^^^^^^^^^^^^
- ``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
- existing row, using any combination of new values as well as values
- from the proposed insertion. These values are specified using the
- :paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
- parameter accepts a dictionary which consists of direct values
- for UPDATE:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
- >>> do_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=['id'],
- ... set_=dict(data='updated value')
- ... )
- >>> print(do_update_stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
- .. warning::
- The :meth:`_expression.Insert.on_conflict_do_update`
- method does **not** take into
- account Python-side default UPDATE values or generation functions, e.g.
- those specified using :paramref:`_schema.Column.onupdate`.
- These values will not be exercised for an ON CONFLICT style of UPDATE,
- unless they are manually specified in the
- :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
- Updating using the Excluded INSERT Values
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- In order to refer to the proposed insertion row, the special alias
- :attr:`~.postgresql.Insert.excluded` is available as an attribute on
- the :class:`_postgresql.Insert` object; this object is a
- :class:`_expression.ColumnCollection`
- which alias contains all columns of the target
- table:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(
- ... id='some_id',
- ... data='inserted value',
- ... author='jlh'
- ... )
- >>> do_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=['id'],
- ... set_=dict(data='updated value', author=stmt.excluded.author)
- ... )
- >>> print(do_update_stmt)
- {opensql}INSERT INTO my_table (id, data, author)
- VALUES (%(id)s, %(data)s, %(author)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
- Additional WHERE Criteria
- ^^^^^^^^^^^^^^^^^^^^^^^^^
- The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
- a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
- parameter, which will limit those rows which receive an UPDATE:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(
- ... id='some_id',
- ... data='inserted value',
- ... author='jlh'
- ... )
- >>> on_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=['id'],
- ... set_=dict(data='updated value', author=stmt.excluded.author),
- ... where=(my_table.c.status == 2)
- ... )
- >>> print(on_update_stmt)
- {opensql}INSERT INTO my_table (id, data, author)
- VALUES (%(id)s, %(data)s, %(author)s)
- ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
- WHERE my_table.status = %(status_1)s
- Skipping Rows with DO NOTHING
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- ``ON CONFLICT`` may be used to skip inserting a row entirely
- if any conflict with a unique or exclusion constraint occurs; below
- this is illustrated using the
- :meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
- >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
- >>> print(stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT (id) DO NOTHING
- If ``DO NOTHING`` is used without specifying any columns or constraint,
- it has the effect of skipping the INSERT for any unique or exclusion
- constraint violation which occurs:
- .. sourcecode:: pycon+sql
- >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
- >>> stmt = stmt.on_conflict_do_nothing()
- >>> print(stmt)
- {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
- ON CONFLICT DO NOTHING
- .. _postgresql_match:
- Full Text Search
- ----------------
- SQLAlchemy makes available the PostgreSQL ``@@`` operator via the
- :meth:`_expression.ColumnElement.match` method on any textual column expression.
- On the PostgreSQL dialect, an expression like the following::
- select(sometable.c.text.match("search string"))
- will emit to the database::
- SELECT text @@ to_tsquery('search string') FROM table
- Various other PostgreSQL text search functions such as ``to_tsquery()``,
- ``to_tsvector()``, and ``plainto_tsquery()`` are available by explicitly using
- the standard SQLAlchemy :data:`.func` construct.
- For example::
- select(func.to_tsvector('fat cats ate rats').match('cat & rat'))
- Emits the equivalent of::
- SELECT to_tsvector('fat cats ate rats') @@ to_tsquery('cat & rat')
- The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
- from sqlalchemy.dialects.postgresql import TSVECTOR
- from sqlalchemy import select, cast
- select(cast("some text", TSVECTOR))
- produces a statement equivalent to::
- SELECT CAST('some text' AS TSVECTOR) AS anon_1
- .. tip::
- It's important to remember that text searching in PostgreSQL is powerful but complicated,
- and SQLAlchemy users are advised to reference the PostgreSQL documentation
- regarding
- `Full Text Search <https://www.postgresql.org/docs/13/textsearch-controls.html>`_.
- There are important differences between ``to_tsquery`` and
- ``plainto_tsquery``, the most significant of which is that ``to_tsquery``
- expects specially formatted "querytext" that is written to PostgreSQL's own
- specification, while ``plainto_tsquery`` expects unformatted text that is
- transformed into ``to_tsquery`` compatible querytext. This means the input to
- ``.match()`` under PostgreSQL may be incompatible with the input to
- ``.match()`` under another database backend. SQLAlchemy users who support
- multiple backends are advised to carefully implement their usage of
- ``.match()`` to work around these constraints.
- Full Text Searches in PostgreSQL are influenced by a combination of: the
- PostgreSQL setting of ``default_text_search_config``, the ``regconfig`` used
- to build the GIN/GiST indexes, and the ``regconfig`` optionally passed in
- during a query.
- When performing a Full Text Search against a column that has a GIN or
- GiST index that is already pre-computed (which is common on full text
- searches) one may need to explicitly pass in a particular PostgreSQL
- ``regconfig`` value to ensure the query-planner utilizes the index and does
- not re-compute the column on demand.
- In order to provide for this explicit query planning, or to use different
- search strategies, the ``match`` method accepts a ``postgresql_regconfig``
- keyword argument::
- select(mytable.c.id).where(
- mytable.c.title.match('somestring', postgresql_regconfig='english')
- )
- Emits the equivalent of::
- SELECT mytable.id FROM mytable
- WHERE mytable.title @@ to_tsquery('english', 'somestring')
- One can also specifically pass in a `'regconfig'` value to the
- ``to_tsvector()`` command as the initial argument::
- select(mytable.c.id).where(
- func.to_tsvector('english', mytable.c.title )\
- .match('somestring', postgresql_regconfig='english')
- )
- produces a statement equivalent to::
- SELECT mytable.id FROM mytable
- WHERE to_tsvector('english', mytable.title) @@
- to_tsquery('english', 'somestring')
- It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
- PostgreSQL to ensure that you are generating queries with SQLAlchemy that
- take full advantage of any indexes you may have created for full text search.
- FROM ONLY ...
- -------------
- The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
- table in an inheritance hierarchy. This can be used to produce the
- ``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
- syntaxes. It uses SQLAlchemy's hints mechanism::
- # SELECT ... FROM ONLY ...
- result = table.select().with_hint(table, 'ONLY', 'postgresql')
- print(result.fetchall())
- # UPDATE ONLY ...
- table.update(values=dict(foo='bar')).with_hint('ONLY',
- dialect_name='postgresql')
- # DELETE FROM ONLY ...
- table.delete().with_hint('ONLY', dialect_name='postgresql')
- .. _postgresql_indexes:
- PostgreSQL-Specific Index Options
- ---------------------------------
- Several extensions to the :class:`.Index` construct are available, specific
- to the PostgreSQL dialect.
- Covering Indexes
- ^^^^^^^^^^^^^^^^
- The ``postgresql_include`` option renders INCLUDE(colname) for the given
- string names::
- Index("my_index", table.c.x, postgresql_include=['y'])
- would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
- Note that this feature requires PostgreSQL 11 or later.
- .. versionadded:: 1.4
- .. _postgresql_partial_indexes:
- Partial Indexes
- ^^^^^^^^^^^^^^^
- Partial indexes add criterion to the index definition so that the index is
- applied to a subset of rows. These can be specified on :class:`.Index`
- using the ``postgresql_where`` keyword argument::
- Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10)
- .. _postgresql_operator_classes:
- Operator Classes
- ^^^^^^^^^^^^^^^^
- PostgreSQL allows the specification of an *operator class* for each column of
- an index (see
- https://www.postgresql.org/docs/8.3/interactive/indexes-opclass.html).
- The :class:`.Index` construct allows these to be specified via the
- ``postgresql_ops`` keyword argument::
- Index(
- 'my_index', my_table.c.id, my_table.c.data,
- postgresql_ops={
- 'data': 'text_pattern_ops',
- 'id': 'int4_ops'
- })
- Note that the keys in the ``postgresql_ops`` dictionaries are the
- "key" name of the :class:`_schema.Column`, i.e. the name used to access it from
- the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
- different than the actual name of the column as expressed in the database.
- If ``postgresql_ops`` is to be used against a complex SQL expression such
- as a function call, then to apply to the column it must be given a label
- that is identified in the dictionary by name, e.g.::
- Index(
- 'my_index', my_table.c.id,
- func.lower(my_table.c.data).label('data_lower'),
- postgresql_ops={
- 'data_lower': 'text_pattern_ops',
- 'id': 'int4_ops'
- })
- Operator classes are also supported by the
- :class:`_postgresql.ExcludeConstraint` construct using the
- :paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
- details.
- .. versionadded:: 1.3.21 added support for operator classes with
- :class:`_postgresql.ExcludeConstraint`.
- Index Types
- ^^^^^^^^^^^
- PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
- as the ability for users to create their own (see
- https://www.postgresql.org/docs/8.3/static/indexes-types.html). These can be
- specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
- Index('my_index', my_table.c.data, postgresql_using='gin')
- The value passed to the keyword argument will be simply passed through to the
- underlying CREATE INDEX command, so it *must* be a valid index type for your
- version of PostgreSQL.
- .. _postgresql_index_storage:
- Index Storage Parameters
- ^^^^^^^^^^^^^^^^^^^^^^^^
- PostgreSQL allows storage parameters to be set on indexes. The storage
- parameters available depend on the index method used by the index. Storage
- parameters can be specified on :class:`.Index` using the ``postgresql_with``
- keyword argument::
- Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50})
- .. versionadded:: 1.0.6
- PostgreSQL allows to define the tablespace in which to create the index.
- The tablespace can be specified on :class:`.Index` using the
- ``postgresql_tablespace`` keyword argument::
- Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace')
- .. versionadded:: 1.1
- Note that the same option is available on :class:`_schema.Table` as well.
- .. _postgresql_index_concurrently:
- Indexes with CONCURRENTLY
- ^^^^^^^^^^^^^^^^^^^^^^^^^
- The PostgreSQL index option CONCURRENTLY is supported by passing the
- flag ``postgresql_concurrently`` to the :class:`.Index` construct::
- tbl = Table('testtbl', m, Column('data', Integer))
- idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True)
- The above index construct will render DDL for CREATE INDEX, assuming
- PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as::
- CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
- For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
- a connection-less dialect, it will emit::
- DROP INDEX CONCURRENTLY test_idx1
- .. versionadded:: 1.1 support for CONCURRENTLY on DROP INDEX. The
- CONCURRENTLY keyword is now only emitted if a high enough version
- of PostgreSQL is detected on the connection (or for a connection-less
- dialect).
- When using CONCURRENTLY, the PostgreSQL database requires that the statement
- be invoked outside of a transaction block. The Python DBAPI enforces that
- even for a single statement, a transaction is present, so to use this
- construct, the DBAPI's "autocommit" mode must be used::
- metadata = MetaData()
- table = Table(
- "foo", metadata,
- Column("id", String))
- index = Index(
- "foo_idx", table.c.id, postgresql_concurrently=True)
- with engine.connect() as conn:
- with conn.execution_options(isolation_level='AUTOCOMMIT'):
- table.create(conn)
- .. seealso::
- :ref:`postgresql_isolation_level`
- .. _postgresql_index_reflection:
- PostgreSQL Index Reflection
- ---------------------------
- The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
- UNIQUE CONSTRAINT construct is used. When inspecting a table using
- :class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
- and the :meth:`_reflection.Inspector.get_unique_constraints`
- will report on these
- two constructs distinctly; in the case of the index, the key
- ``duplicates_constraint`` will be present in the index entry if it is
- detected as mirroring a constraint. When performing reflection using
- ``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
- in :attr:`_schema.Table.indexes` when it is detected as mirroring a
- :class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
- .
- .. versionchanged:: 1.0.0 - :class:`_schema.Table` reflection now includes
- :class:`.UniqueConstraint` objects present in the
- :attr:`_schema.Table.constraints`
- collection; the PostgreSQL backend will no longer include a "mirrored"
- :class:`.Index` construct in :attr:`_schema.Table.indexes`
- if it is detected
- as corresponding to a unique constraint.
- Special Reflection Options
- --------------------------
- The :class:`_reflection.Inspector`
- used for the PostgreSQL backend is an instance
- of :class:`.PGInspector`, which offers additional methods::
- from sqlalchemy import create_engine, inspect
- engine = create_engine("postgresql+psycopg2://localhost/test")
- insp = inspect(engine) # will be a PGInspector
- print(insp.get_enums())
- .. autoclass:: PGInspector
- :members:
- .. _postgresql_table_options:
- PostgreSQL Table Options
- ------------------------
- Several options for CREATE TABLE are supported directly by the PostgreSQL
- dialect in conjunction with the :class:`_schema.Table` construct:
- * ``TABLESPACE``::
- Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace')
- The above option is also available on the :class:`.Index` construct.
- * ``ON COMMIT``::
- Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS')
- * ``WITH OIDS``::
- Table("some_table", metadata, ..., postgresql_with_oids=True)
- * ``WITHOUT OIDS``::
- Table("some_table", metadata, ..., postgresql_with_oids=False)
- * ``INHERITS``::
- Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
- Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
- .. versionadded:: 1.0.0
- * ``PARTITION BY``::
- Table("some_table", metadata, ...,
- postgresql_partition_by='LIST (part_column)')
- .. versionadded:: 1.2.6
- .. seealso::
- `PostgreSQL CREATE TABLE options
- <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_
- .. _postgresql_table_valued_overview:
- Table values, Table and Column valued functions, Row and Tuple objects
- -----------------------------------------------------------------------
- PostgreSQL makes great use of modern SQL forms such as table-valued functions,
- tables and rows as values. These constructs are commonly used as part
- of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
- datatypes. SQLAlchemy's SQL expression language has native support for
- most table-valued and row-valued forms.
- .. _postgresql_table_valued:
- Table-Valued Functions
- ^^^^^^^^^^^^^^^^^^^^^^^
- Many PostgreSQL built-in functions are intended to be used in the FROM clause
- of a SELECT statement, and are capable of returning table rows or sets of table
- rows. A large portion of PostgreSQL's JSON functions for example such as
- ``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
- ``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
- forms. These classes of SQL function calling forms in SQLAlchemy are available
- using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
- with :class:`_functions.Function` objects generated from the :data:`_sql.func`
- namespace.
- Examples from PostgreSQL's reference documentation follow below:
- * ``json_each()``::
- >>> from sqlalchemy import select, func
- >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value"))
- >>> print(stmt)
- SELECT anon_1.key, anon_1.value
- FROM json_each(:json_each_1) AS anon_1
- * ``json_populate_record()``::
- >>> from sqlalchemy import select, func, literal_column
- >>> stmt = select(
- ... func.json_populate_record(
- ... literal_column("null::myrowtype"),
- ... '{"a":1,"b":2}'
- ... ).table_valued("a", "b", name="x")
- ... )
- >>> print(stmt)
- SELECT x.a, x.b
- FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
- * ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
- columns in the alias, where we may make use of :func:`_sql.column` elements with
- types to produce them. The :meth:`_functions.FunctionElement.table_valued`
- method produces a :class:`_sql.TableValuedAlias` construct, and the method
- :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
- columns specification::
- >>> from sqlalchemy import select, func, column, Integer, Text
- >>> stmt = select(
- ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued(
- ... column("a", Integer), column("b", Text), column("d", Text),
- ... ).render_derived(name="x", with_types=True)
- ... )
- >>> print(stmt)
- SELECT x.a, x.b, x.d
- FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
- * ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
- ordinal counter to the output of a function and is accepted by a limited set
- of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
- :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
- parameter ``with_ordinality`` for this purpose, which accepts the string name
- that will be applied to the "ordinality" column::
- >>> from sqlalchemy import select, func
- >>> stmt = select(
- ... func.generate_series(4, 1, -1).table_valued("value", with_ordinality="ordinality")
- ... )
- >>> print(stmt)
- SELECT anon_1.value, anon_1.ordinality
- FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) WITH ORDINALITY AS anon_1
- .. versionadded:: 1.4.0b2
- .. seealso::
- :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
- .. _postgresql_column_valued:
- Column Valued Functions
- ^^^^^^^^^^^^^^^^^^^^^^^
- Similar to the table valued function, a column valued function is present
- in the FROM clause, but delivers itself to the columns clause as a single
- scalar value. PostgreSQL functions such as ``json_array_elements()``,
- ``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
- :meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
- * ``json_array_elements()``::
- >>> from sqlalchemy import select, func
- >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x"))
- >>> print(stmt)
- SELECT x
- FROM json_array_elements(:json_array_elements_1) AS x
- * ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
- :func:`_postgresql.array` construct may be used::
- >>> from sqlalchemy.dialects.postgresql import array
- >>> from sqlalchemy import select, func
- >>> stmt = select(func.unnest(array([1, 2])).column_valued())
- >>> print(stmt)
- SELECT anon_1
- FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
- The function can of course be used against an existing table-bound column
- that's of type :class:`_types.ARRAY`::
- >>> from sqlalchemy import table, column, ARRAY, Integer
- >>> from sqlalchemy import select, func
- >>> t = table("t", column('value', ARRAY(Integer)))
- >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
- >>> print(stmt)
- SELECT unnested_value
- FROM unnest(t.value) AS unnested_value
- .. seealso::
- :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
- Row Types
- ^^^^^^^^^
- Built-in support for rendering a ``ROW`` may be approximated using
- ``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
- :func:`_sql.tuple_` construct::
- >>> from sqlalchemy import table, column, func, tuple_
- >>> t = table("t", column("id"), column("fk"))
- >>> stmt = t.select().where(
- ... tuple_(t.c.id, t.c.fk) > (1,2)
- ... ).where(
- ... func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)
- ... )
- >>> print(stmt)
- SELECT t.id, t.fk
- FROM t
- WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
- .. seealso::
- `PostgreSQL Row Constructors
- <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
- `PostgreSQL Row Constructor Comparison
- <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
- Table Types passed to Functions
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- PostgreSQL supports passing a table as an argument to a function, which it
- refers towards as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
- such as :class:`_schema.Table` support this special form using the
- :meth:`_sql.FromClause.table_valued` method, which is comparable to the
- :meth:`_functions.FunctionElement.table_valued` method except that the collection
- of columns is already established by that of the :class:`_sql.FromClause`
- itself::
- >>> from sqlalchemy import table, column, func, select
- >>> a = table( "a", column("id"), column("x"), column("y"))
- >>> stmt = select(func.row_to_json(a.table_valued()))
- >>> print(stmt)
- SELECT row_to_json(a) AS row_to_json_1
- FROM a
- .. versionadded:: 1.4.0b2
- ARRAY Types
- -----------
- The PostgreSQL dialect supports arrays, both as multidimensional column types
- as well as array literals:
- * :class:`_postgresql.ARRAY` - ARRAY datatype
- * :class:`_postgresql.array` - array literal
- * :func:`_postgresql.array_agg` - ARRAY_AGG SQL function
- * :class:`_postgresql.aggregate_order_by` - helper for PG's ORDER BY aggregate
- function syntax.
- JSON Types
- ----------
- The PostgreSQL dialect supports both JSON and JSONB datatypes, including
- psycopg2's native support and support for all of PostgreSQL's special
- operators:
- * :class:`_postgresql.JSON`
- * :class:`_postgresql.JSONB`
- HSTORE Type
- -----------
- The PostgreSQL HSTORE type as well as hstore literals are supported:
- * :class:`_postgresql.HSTORE` - HSTORE datatype
- * :class:`_postgresql.hstore` - hstore literal
- ENUM Types
- ----------
- PostgreSQL has an independently creatable TYPE structure which is used
- to implement an enumerated type. This approach introduces significant
- complexity on the SQLAlchemy side in terms of when this type should be
- CREATED and DROPPED. The type object is also an independently reflectable
- entity. The following sections should be consulted:
- * :class:`_postgresql.ENUM` - DDL and typing support for ENUM.
- * :meth:`.PGInspector.get_enums` - retrieve a listing of current ENUM types
- * :meth:`.postgresql.ENUM.create` , :meth:`.postgresql.ENUM.drop` - individual
- CREATE and DROP commands for ENUM.
- .. _postgresql_array_of_enum:
- Using ENUM with ARRAY
- ^^^^^^^^^^^^^^^^^^^^^
- The combination of ENUM and ARRAY is not directly supported by backend
- DBAPIs at this time. Prior to SQLAlchemy 1.3.17, a special workaround
- was needed in order to allow this combination to work, described below.
- .. versionchanged:: 1.3.17 The combination of ENUM and ARRAY is now directly
- handled by SQLAlchemy's implementation without any workarounds needed.
- .. sourcecode:: python
- from sqlalchemy import TypeDecorator
- from sqlalchemy.dialects.postgresql import ARRAY
- class ArrayOfEnum(TypeDecorator):
- impl = ARRAY
- def bind_expression(self, bindvalue):
- return sa.cast(bindvalue, self)
- def result_processor(self, dialect, coltype):
- super_rp = super(ArrayOfEnum, self).result_processor(
- dialect, coltype)
- def handle_raw_string(value):
- inner = re.match(r"^{(.*)}$", value).group(1)
- return inner.split(",") if inner else []
- def process(value):
- if value is None:
- return None
- return super_rp(handle_raw_string(value))
- return process
- E.g.::
- Table(
- 'mydata', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum')))
- )
- This type is not included as a built-in type as it would be incompatible
- with a DBAPI that suddenly decides to support ARRAY of ENUM directly in
- a new version.
- .. _postgresql_array_of_json:
- Using JSON/JSONB with ARRAY
- ^^^^^^^^^^^^^^^^^^^^^^^^^^^
- Similar to using ENUM, prior to SQLAlchemy 1.3.17, for an ARRAY of JSON/JSONB
- we need to render the appropriate CAST. Current psycopg2 drivers accommodate
- the result set correctly without any special steps.
- .. versionchanged:: 1.3.17 The combination of JSON/JSONB and ARRAY is now
- directly handled by SQLAlchemy's implementation without any workarounds
- needed.
- .. sourcecode:: python
- class CastingArray(ARRAY):
- def bind_expression(self, bindvalue):
- return sa.cast(bindvalue, self)
- E.g.::
- Table(
- 'mydata', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', CastingArray(JSONB))
- )
- """ # noqa E501
- from collections import defaultdict
- import datetime as dt
- import re
- from uuid import UUID as _python_UUID
- from . import array as _array
- from . import hstore as _hstore
- from . import json as _json
- from . import ranges as _ranges
- from ... import exc
- from ... import schema
- from ... import sql
- from ... import util
- from ...engine import characteristics
- from ...engine import default
- from ...engine import reflection
- from ...sql import coercions
- from ...sql import compiler
- from ...sql import elements
- from ...sql import expression
- from ...sql import roles
- from ...sql import sqltypes
- from ...sql import util as sql_util
- from ...sql.ddl import DDLBase
- from ...types import BIGINT
- from ...types import BOOLEAN
- from ...types import CHAR
- from ...types import DATE
- from ...types import FLOAT
- from ...types import INTEGER
- from ...types import NUMERIC
- from ...types import REAL
- from ...types import SMALLINT
- from ...types import TEXT
- from ...types import VARCHAR
- IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
- AUTOCOMMIT_REGEXP = re.compile(
- r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|"
- "IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW|TRUNCATE)",
- re.I | re.UNICODE,
- )
- RESERVED_WORDS = set(
- [
- "all",
- "analyse",
- "analyze",
- "and",
- "any",
- "array",
- "as",
- "asc",
- "asymmetric",
- "both",
- "case",
- "cast",
- "check",
- "collate",
- "column",
- "constraint",
- "create",
- "current_catalog",
- "current_date",
- "current_role",
- "current_time",
- "current_timestamp",
- "current_user",
- "default",
- "deferrable",
- "desc",
- "distinct",
- "do",
- "else",
- "end",
- "except",
- "false",
- "fetch",
- "for",
- "foreign",
- "from",
- "grant",
- "group",
- "having",
- "in",
- "initially",
- "intersect",
- "into",
- "leading",
- "limit",
- "localtime",
- "localtimestamp",
- "new",
- "not",
- "null",
- "of",
- "off",
- "offset",
- "old",
- "on",
- "only",
- "or",
- "order",
- "placing",
- "primary",
- "references",
- "returning",
- "select",
- "session_user",
- "some",
- "symmetric",
- "table",
- "then",
- "to",
- "trailing",
- "true",
- "union",
- "unique",
- "user",
- "using",
- "variadic",
- "when",
- "where",
- "window",
- "with",
- "authorization",
- "between",
- "binary",
- "cross",
- "current_schema",
- "freeze",
- "full",
- "ilike",
- "inner",
- "is",
- "isnull",
- "join",
- "left",
- "like",
- "natural",
- "notnull",
- "outer",
- "over",
- "overlaps",
- "right",
- "similar",
- "verbose",
- ]
- )
- _DECIMAL_TYPES = (1231, 1700)
- _FLOAT_TYPES = (700, 701, 1021, 1022)
- _INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
- class BYTEA(sqltypes.LargeBinary):
- __visit_name__ = "BYTEA"
- class DOUBLE_PRECISION(sqltypes.Float):
- __visit_name__ = "DOUBLE_PRECISION"
- class INET(sqltypes.TypeEngine):
- __visit_name__ = "INET"
- PGInet = INET
- class CIDR(sqltypes.TypeEngine):
- __visit_name__ = "CIDR"
- PGCidr = CIDR
- class MACADDR(sqltypes.TypeEngine):
- __visit_name__ = "MACADDR"
- PGMacAddr = MACADDR
- class MONEY(sqltypes.TypeEngine):
- r"""Provide the PostgreSQL MONEY type.
- Depending on driver, result rows using this type may return a
- string value which includes currency symbols.
- For this reason, it may be preferable to provide conversion to a
- numerically-based currency datatype using :class:`_types.TypeDecorator`::
- import re
- import decimal
- from sqlalchemy import TypeDecorator
- class NumericMoney(TypeDecorator):
- impl = MONEY
- def process_result_value(self, value: Any, dialect: Any) -> None:
- if value is not None:
- # adjust this for the currency and numeric
- m = re.match(r"\$([\d.]+)", value)
- if m:
- value = decimal.Decimal(m.group(1))
- return value
- Alternatively, the conversion may be applied as a CAST using
- the :meth:`_types.TypeDecorator.column_expression` method as follows::
- import decimal
- from sqlalchemy import cast
- from sqlalchemy import TypeDecorator
- class NumericMoney(TypeDecorator):
- impl = MONEY
- def column_expression(self, column: Any):
- return cast(column, Numeric())
- .. versionadded:: 1.2
- """
- __visit_name__ = "MONEY"
- class OID(sqltypes.TypeEngine):
- """Provide the PostgreSQL OID type.
- .. versionadded:: 0.9.5
- """
- __visit_name__ = "OID"
- class REGCLASS(sqltypes.TypeEngine):
- """Provide the PostgreSQL REGCLASS type.
- .. versionadded:: 1.2.7
- """
- __visit_name__ = "REGCLASS"
- class TIMESTAMP(sqltypes.TIMESTAMP):
- def __init__(self, timezone=False, precision=None):
- super(TIMESTAMP, self).__init__(timezone=timezone)
- self.precision = precision
- class TIME(sqltypes.TIME):
- def __init__(self, timezone=False, precision=None):
- super(TIME, self).__init__(timezone=timezone)
- self.precision = precision
- class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
- """PostgreSQL INTERVAL type."""
- __visit_name__ = "INTERVAL"
- native = True
- def __init__(self, precision=None, fields=None):
- """Construct an INTERVAL.
- :param precision: optional integer precision value
- :param fields: string fields specifier. allows storage of fields
- to be limited, such as ``"YEAR"``, ``"MONTH"``, ``"DAY TO HOUR"``,
- etc.
- .. versionadded:: 1.2
- """
- self.precision = precision
- self.fields = fields
- @classmethod
- def adapt_emulated_to_native(cls, interval, **kw):
- return INTERVAL(precision=interval.second_precision)
- @property
- def _type_affinity(self):
- return sqltypes.Interval
- def as_generic(self, allow_nulltype=False):
- return sqltypes.Interval(native=True, second_precision=self.precision)
- @property
- def python_type(self):
- return dt.timedelta
- def coerce_compared_value(self, op, value):
- return self
- PGInterval = INTERVAL
- class BIT(sqltypes.TypeEngine):
- __visit_name__ = "BIT"
- def __init__(self, length=None, varying=False):
- if not varying:
- # BIT without VARYING defaults to length 1
- self.length = length or 1
- else:
- # but BIT VARYING can be unlimited-length, so no default
- self.length = length
- self.varying = varying
- PGBit = BIT
- class UUID(sqltypes.TypeEngine):
- """PostgreSQL UUID type.
- Represents the UUID column type, interpreting
- data either as natively returned by the DBAPI
- or as Python uuid objects.
- The UUID type is currently known to work within the prominent DBAPI
- drivers supported by SQLAlchemy including psycopg2, pg8000 and
- asyncpg. Support for other DBAPI drivers may be incomplete or non-present.
- """
- __visit_name__ = "UUID"
- def __init__(self, as_uuid=False):
- """Construct a UUID type.
- :param as_uuid=False: if True, values will be interpreted
- as Python uuid objects, converting to/from string via the
- DBAPI.
- """
- self.as_uuid = as_uuid
- def coerce_compared_value(self, op, value):
- """See :meth:`.TypeEngine.coerce_compared_value` for a description."""
- if isinstance(value, util.string_types):
- return self
- else:
- return super(UUID, self).coerce_compared_value(op, value)
- def bind_processor(self, dialect):
- if self.as_uuid:
- def process(value):
- if value is not None:
- value = util.text_type(value)
- return value
- return process
- else:
- return None
- def result_processor(self, dialect, coltype):
- if self.as_uuid:
- def process(value):
- if value is not None:
- value = _python_UUID(value)
- return value
- return process
- else:
- return None
- def literal_processor(self, dialect):
- if self.as_uuid:
- def process(value):
- if value is not None:
- value = "'%s'::UUID" % value
- return value
- return process
- else:
- def process(value):
- if value is not None:
- value = "'%s'" % value
- return value
- return process
- PGUuid = UUID
- class TSVECTOR(sqltypes.TypeEngine):
- """The :class:`_postgresql.TSVECTOR` type implements the PostgreSQL
- text search type TSVECTOR.
- It can be used to do full text queries on natural language
- documents.
- .. versionadded:: 0.9.0
- .. seealso::
- :ref:`postgresql_match`
- """
- __visit_name__ = "TSVECTOR"
- class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum):
- """PostgreSQL ENUM type.
- This is a subclass of :class:`_types.Enum` which includes
- support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
- When the builtin type :class:`_types.Enum` is used and the
- :paramref:`.Enum.native_enum` flag is left at its default of
- True, the PostgreSQL backend will use a :class:`_postgresql.ENUM`
- type as the implementation, so the special create/drop rules
- will be used.
- The create/drop behavior of ENUM is necessarily intricate, due to the
- awkward relationship the ENUM type has in relationship to the
- parent table, in that it may be "owned" by just a single table, or
- may be shared among many tables.
- When using :class:`_types.Enum` or :class:`_postgresql.ENUM`
- in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
- corresponding to when the :meth:`_schema.Table.create` and
- :meth:`_schema.Table.drop`
- methods are called::
- table = Table('sometable', metadata,
- Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
- )
- table.create(engine) # will emit CREATE ENUM and CREATE TABLE
- table.drop(engine) # will emit DROP TABLE and DROP ENUM
- To use a common enumerated type between multiple tables, the best
- practice is to declare the :class:`_types.Enum` or
- :class:`_postgresql.ENUM` independently, and associate it with the
- :class:`_schema.MetaData` object itself::
- my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
- t1 = Table('sometable_one', metadata,
- Column('some_enum', myenum)
- )
- t2 = Table('sometable_two', metadata,
- Column('some_enum', myenum)
- )
- When this pattern is used, care must still be taken at the level
- of individual table creates. Emitting CREATE TABLE without also
- specifying ``checkfirst=True`` will still cause issues::
- t1.create(engine) # will fail: no such type 'myenum'
- If we specify ``checkfirst=True``, the individual table-level create
- operation will check for the ``ENUM`` and create if not exists::
- # will check if enum exists, and emit CREATE TYPE if not
- t1.create(engine, checkfirst=True)
- When using a metadata-level ENUM type, the type will always be created
- and dropped if either the metadata-wide create/drop is called::
- metadata.create_all(engine) # will emit CREATE TYPE
- metadata.drop_all(engine) # will emit DROP TYPE
- The type can also be created and dropped directly::
- my_enum.create(engine)
- my_enum.drop(engine)
- .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type
- now behaves more strictly with regards to CREATE/DROP. A metadata-level
- ENUM type will only be created and dropped at the metadata level,
- not the table level, with the exception of
- ``table.create(checkfirst=True)``.
- The ``table.drop()`` call will now emit a DROP TYPE for a table-level
- enumerated type.
- """
- native_enum = True
- def __init__(self, *enums, **kw):
- """Construct an :class:`_postgresql.ENUM`.
- Arguments are the same as that of
- :class:`_types.Enum`, but also including
- the following parameters.
- :param create_type: Defaults to True.
- Indicates that ``CREATE TYPE`` should be
- emitted, after optionally checking for the
- presence of the type, when the parent
- table is being created; and additionally
- that ``DROP TYPE`` is called when the table
- is dropped. When ``False``, no check
- will be performed and no ``CREATE TYPE``
- or ``DROP TYPE`` is emitted, unless
- :meth:`~.postgresql.ENUM.create`
- or :meth:`~.postgresql.ENUM.drop`
- are called directly.
- Setting to ``False`` is helpful
- when invoking a creation scheme to a SQL file
- without access to the actual database -
- the :meth:`~.postgresql.ENUM.create` and
- :meth:`~.postgresql.ENUM.drop` methods can
- be used to emit SQL to a target bind.
- """
- native_enum = kw.pop("native_enum", None)
- if native_enum is False:
- util.warn(
- "the native_enum flag does not apply to the "
- "sqlalchemy.dialects.postgresql.ENUM datatype; this type "
- "always refers to ENUM. Use sqlalchemy.types.Enum for "
- "non-native enum."
- )
- self.create_type = kw.pop("create_type", True)
- super(ENUM, self).__init__(*enums, **kw)
- @classmethod
- def adapt_emulated_to_native(cls, impl, **kw):
- """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain
- :class:`.Enum`.
- """
- kw.setdefault("validate_strings", impl.validate_strings)
- kw.setdefault("name", impl.name)
- kw.setdefault("schema", impl.schema)
- kw.setdefault("inherit_schema", impl.inherit_schema)
- kw.setdefault("metadata", impl.metadata)
- kw.setdefault("_create_events", False)
- kw.setdefault("values_callable", impl.values_callable)
- kw.setdefault("omit_aliases", impl._omit_aliases)
- return cls(**kw)
- def create(self, bind=None, checkfirst=True):
- """Emit ``CREATE TYPE`` for this
- :class:`_postgresql.ENUM`.
- If the underlying dialect does not support
- PostgreSQL CREATE TYPE, no action is taken.
- :param bind: a connectable :class:`_engine.Engine`,
- :class:`_engine.Connection`, or similar object to emit
- SQL.
- :param checkfirst: if ``True``, a query against
- the PG catalog will be first performed to see
- if the type does not exist already before
- creating.
- """
- if not bind.dialect.supports_native_enum:
- return
- bind._run_ddl_visitor(self.EnumGenerator, self, checkfirst=checkfirst)
- def drop(self, bind=None, checkfirst=True):
- """Emit ``DROP TYPE`` for this
- :class:`_postgresql.ENUM`.
- If the underlying dialect does not support
- PostgreSQL DROP TYPE, no action is taken.
- :param bind: a connectable :class:`_engine.Engine`,
- :class:`_engine.Connection`, or similar object to emit
- SQL.
- :param checkfirst: if ``True``, a query against
- the PG catalog will be first performed to see
- if the type actually exists before dropping.
- """
- if not bind.dialect.supports_native_enum:
- return
- bind._run_ddl_visitor(self.EnumDropper, self, checkfirst=checkfirst)
- class EnumGenerator(DDLBase):
- def __init__(self, dialect, connection, checkfirst=False, **kwargs):
- super(ENUM.EnumGenerator, self).__init__(connection, **kwargs)
- self.checkfirst = checkfirst
- def _can_create_enum(self, enum):
- if not self.checkfirst:
- return True
- effective_schema = self.connection.schema_for_object(enum)
- return not self.connection.dialect.has_type(
- self.connection, enum.name, schema=effective_schema
- )
- def visit_enum(self, enum):
- if not self._can_create_enum(enum):
- return
- self.connection.execute(CreateEnumType(enum))
- class EnumDropper(DDLBase):
- def __init__(self, dialect, connection, checkfirst=False, **kwargs):
- super(ENUM.EnumDropper, self).__init__(connection, **kwargs)
- self.checkfirst = checkfirst
- def _can_drop_enum(self, enum):
- if not self.checkfirst:
- return True
- effective_schema = self.connection.schema_for_object(enum)
- return self.connection.dialect.has_type(
- self.connection, enum.name, schema=effective_schema
- )
- def visit_enum(self, enum):
- if not self._can_drop_enum(enum):
- return
- self.connection.execute(DropEnumType(enum))
- def _check_for_name_in_memos(self, checkfirst, kw):
- """Look in the 'ddl runner' for 'memos', then
- note our name in that collection.
- This to ensure a particular named enum is operated
- upon only once within any kind of create/drop
- sequence without relying upon "checkfirst".
- """
- if not self.create_type:
- return True
- if "_ddl_runner" in kw:
- ddl_runner = kw["_ddl_runner"]
- if "_pg_enums" in ddl_runner.memo:
- pg_enums = ddl_runner.memo["_pg_enums"]
- else:
- pg_enums = ddl_runner.memo["_pg_enums"] = set()
- present = (self.schema, self.name) in pg_enums
- pg_enums.add((self.schema, self.name))
- return present
- else:
- return False
- def _on_table_create(self, target, bind, checkfirst=False, **kw):
- if (
- checkfirst
- or (
- not self.metadata
- and not kw.get("_is_metadata_operation", False)
- )
- ) and not self._check_for_name_in_memos(checkfirst, kw):
- self.create(bind=bind, checkfirst=checkfirst)
- def _on_table_drop(self, target, bind, checkfirst=False, **kw):
- if (
- not self.metadata
- and not kw.get("_is_metadata_operation", False)
- and not self._check_for_name_in_memos(checkfirst, kw)
- ):
- self.drop(bind=bind, checkfirst=checkfirst)
- def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
- if not self._check_for_name_in_memos(checkfirst, kw):
- self.create(bind=bind, checkfirst=checkfirst)
- def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
- if not self._check_for_name_in_memos(checkfirst, kw):
- self.drop(bind=bind, checkfirst=checkfirst)
- class _ColonCast(elements.Cast):
- __visit_name__ = "colon_cast"
- def __init__(self, expression, type_):
- self.type = type_
- self.clause = expression
- self.typeclause = elements.TypeClause(type_)
- colspecs = {
- sqltypes.ARRAY: _array.ARRAY,
- sqltypes.Interval: INTERVAL,
- sqltypes.Enum: ENUM,
- sqltypes.JSON.JSONPathType: _json.JSONPathType,
- sqltypes.JSON: _json.JSON,
- }
- ischema_names = {
- "_array": _array.ARRAY,
- "hstore": _hstore.HSTORE,
- "json": _json.JSON,
- "jsonb": _json.JSONB,
- "int4range": _ranges.INT4RANGE,
- "int8range": _ranges.INT8RANGE,
- "numrange": _ranges.NUMRANGE,
- "daterange": _ranges.DATERANGE,
- "tsrange": _ranges.TSRANGE,
- "tstzrange": _ranges.TSTZRANGE,
- "integer": INTEGER,
- "bigint": BIGINT,
- "smallint": SMALLINT,
- "character varying": VARCHAR,
- "character": CHAR,
- '"char"': sqltypes.String,
- "name": sqltypes.String,
- "text": TEXT,
- "numeric": NUMERIC,
- "float": FLOAT,
- "real": REAL,
- "inet": INET,
- "cidr": CIDR,
- "uuid": UUID,
- "bit": BIT,
- "bit varying": BIT,
- "macaddr": MACADDR,
- "money": MONEY,
- "oid": OID,
- "regclass": REGCLASS,
- "double precision": DOUBLE_PRECISION,
- "timestamp": TIMESTAMP,
- "timestamp with time zone": TIMESTAMP,
- "timestamp without time zone": TIMESTAMP,
- "time with time zone": TIME,
- "time without time zone": TIME,
- "date": DATE,
- "time": TIME,
- "bytea": BYTEA,
- "boolean": BOOLEAN,
- "interval": INTERVAL,
- "tsvector": TSVECTOR,
- }
- class PGCompiler(compiler.SQLCompiler):
- def visit_colon_cast(self, element, **kw):
- return "%s::%s" % (
- element.clause._compiler_dispatch(self, **kw),
- element.typeclause._compiler_dispatch(self, **kw),
- )
- def visit_array(self, element, **kw):
- return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
- def visit_slice(self, element, **kw):
- return "%s:%s" % (
- self.process(element.start, **kw),
- self.process(element.stop, **kw),
- )
- def visit_json_getitem_op_binary(
- self, binary, operator, _cast_applied=False, **kw
- ):
- if (
- not _cast_applied
- and binary.type._type_affinity is not sqltypes.JSON
- ):
- kw["_cast_applied"] = True
- return self.process(sql.cast(binary, binary.type), **kw)
- kw["eager_grouping"] = True
- return self._generate_generic_binary(
- binary, " -> " if not _cast_applied else " ->> ", **kw
- )
- def visit_json_path_getitem_op_binary(
- self, binary, operator, _cast_applied=False, **kw
- ):
- if (
- not _cast_applied
- and binary.type._type_affinity is not sqltypes.JSON
- ):
- kw["_cast_applied"] = True
- return self.process(sql.cast(binary, binary.type), **kw)
- kw["eager_grouping"] = True
- return self._generate_generic_binary(
- binary, " #> " if not _cast_applied else " #>> ", **kw
- )
- def visit_getitem_binary(self, binary, operator, **kw):
- return "%s[%s]" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
- def visit_aggregate_order_by(self, element, **kw):
- return "%s ORDER BY %s" % (
- self.process(element.target, **kw),
- self.process(element.order_by, **kw),
- )
- def visit_match_op_binary(self, binary, operator, **kw):
- if "postgresql_regconfig" in binary.modifiers:
- regconfig = self.render_literal_value(
- binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
- )
- if regconfig:
- return "%s @@ to_tsquery(%s, %s)" % (
- self.process(binary.left, **kw),
- regconfig,
- self.process(binary.right, **kw),
- )
- return "%s @@ to_tsquery(%s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
- def visit_ilike_op_binary(self, binary, operator, **kw):
- escape = binary.modifiers.get("escape", None)
- return "%s ILIKE %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- ) + (
- " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
- if escape
- else ""
- )
- def visit_not_ilike_op_binary(self, binary, operator, **kw):
- escape = binary.modifiers.get("escape", None)
- return "%s NOT ILIKE %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- ) + (
- " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
- if escape
- else ""
- )
- def _regexp_match(self, base_op, binary, operator, kw):
- flags = binary.modifiers["flags"]
- if flags is None:
- return self._generate_generic_binary(
- binary, " %s " % base_op, **kw
- )
- if isinstance(flags, elements.BindParameter) and flags.value == "i":
- return self._generate_generic_binary(
- binary, " %s* " % base_op, **kw
- )
- flags = self.process(flags, **kw)
- string = self.process(binary.left, **kw)
- pattern = self.process(binary.right, **kw)
- return "%s %s CONCAT('(?', %s, ')', %s)" % (
- string,
- base_op,
- flags,
- pattern,
- )
- def visit_regexp_match_op_binary(self, binary, operator, **kw):
- return self._regexp_match("~", binary, operator, kw)
- def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
- return self._regexp_match("!~", binary, operator, kw)
- def visit_regexp_replace_op_binary(self, binary, operator, **kw):
- string = self.process(binary.left, **kw)
- pattern = self.process(binary.right, **kw)
- flags = binary.modifiers["flags"]
- if flags is not None:
- flags = self.process(flags, **kw)
- replacement = self.process(binary.modifiers["replacement"], **kw)
- if flags is None:
- return "REGEXP_REPLACE(%s, %s, %s)" % (
- string,
- pattern,
- replacement,
- )
- else:
- return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
- string,
- pattern,
- replacement,
- flags,
- )
- def visit_empty_set_expr(self, element_types):
- # cast the empty set to the type we are comparing against. if
- # we are comparing against the null type, pick an arbitrary
- # datatype for the empty set
- return "SELECT %s WHERE 1!=1" % (
- ", ".join(
- "CAST(NULL AS %s)"
- % self.dialect.type_compiler.process(
- INTEGER() if type_._isnull else type_
- )
- for type_ in element_types or [INTEGER()]
- ),
- )
- def render_literal_value(self, value, type_):
- value = super(PGCompiler, self).render_literal_value(value, type_)
- if self.dialect._backslash_escapes:
- value = value.replace("\\", "\\\\")
- return value
- def visit_sequence(self, seq, **kw):
- return "nextval('%s')" % self.preparer.format_sequence(seq)
- def limit_clause(self, select, **kw):
- text = ""
- if select._limit_clause is not None:
- text += " \n LIMIT " + self.process(select._limit_clause, **kw)
- if select._offset_clause is not None:
- if select._limit_clause is None:
- text += "\n LIMIT ALL"
- text += " OFFSET " + self.process(select._offset_clause, **kw)
- return text
- def format_from_hint_text(self, sqltext, table, hint, iscrud):
- if hint.upper() != "ONLY":
- raise exc.CompileError("Unrecognized hint: %r" % hint)
- return "ONLY " + sqltext
- def get_select_precolumns(self, select, **kw):
- # Do not call super().get_select_precolumns because
- # it will warn/raise when distinct on is present
- if select._distinct or select._distinct_on:
- if select._distinct_on:
- return (
- "DISTINCT ON ("
- + ", ".join(
- [
- self.process(col, **kw)
- for col in select._distinct_on
- ]
- )
- + ") "
- )
- else:
- return "DISTINCT "
- else:
- return ""
- def for_update_clause(self, select, **kw):
- if select._for_update_arg.read:
- if select._for_update_arg.key_share:
- tmp = " FOR KEY SHARE"
- else:
- tmp = " FOR SHARE"
- elif select._for_update_arg.key_share:
- tmp = " FOR NO KEY UPDATE"
- else:
- tmp = " FOR UPDATE"
- if select._for_update_arg.of:
- tables = util.OrderedSet()
- for c in select._for_update_arg.of:
- tables.update(sql_util.surface_selectables_only(c))
- tmp += " OF " + ", ".join(
- self.process(table, ashint=True, use_schema=False, **kw)
- for table in tables
- )
- if select._for_update_arg.nowait:
- tmp += " NOWAIT"
- if select._for_update_arg.skip_locked:
- tmp += " SKIP LOCKED"
- return tmp
- def returning_clause(self, stmt, returning_cols):
- columns = [
- self._label_returning_column(stmt, c)
- for c in expression._select_iterables(returning_cols)
- ]
- return "RETURNING " + ", ".join(columns)
- def visit_substring_func(self, func, **kw):
- s = self.process(func.clauses.clauses[0], **kw)
- start = self.process(func.clauses.clauses[1], **kw)
- if len(func.clauses.clauses) > 2:
- length = self.process(func.clauses.clauses[2], **kw)
- return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
- else:
- return "SUBSTRING(%s FROM %s)" % (s, start)
- def _on_conflict_target(self, clause, **kw):
- if clause.constraint_target is not None:
- # target may be a name of an Index, UniqueConstraint or
- # ExcludeConstraint. While there is a separate
- # "max_identifier_length" for indexes, PostgreSQL uses the same
- # length for all objects so we can use
- # truncate_and_render_constraint_name
- target_text = (
- "ON CONSTRAINT %s"
- % self.preparer.truncate_and_render_constraint_name(
- clause.constraint_target
- )
- )
- elif clause.inferred_target_elements is not None:
- target_text = "(%s)" % ", ".join(
- (
- self.preparer.quote(c)
- if isinstance(c, util.string_types)
- else self.process(c, include_table=False, use_schema=False)
- )
- for c in clause.inferred_target_elements
- )
- if clause.inferred_target_whereclause is not None:
- target_text += " WHERE %s" % self.process(
- clause.inferred_target_whereclause,
- include_table=False,
- use_schema=False,
- )
- else:
- target_text = ""
- return target_text
- def visit_on_conflict_do_nothing(self, on_conflict, **kw):
- target_text = self._on_conflict_target(on_conflict, **kw)
- if target_text:
- return "ON CONFLICT %s DO NOTHING" % target_text
- else:
- return "ON CONFLICT DO NOTHING"
- def visit_on_conflict_do_update(self, on_conflict, **kw):
- clause = on_conflict
- target_text = self._on_conflict_target(on_conflict, **kw)
- action_set_ops = []
- set_parameters = dict(clause.update_values_to_set)
- # create a list of column assignment clauses as tuples
- insert_statement = self.stack[-1]["selectable"]
- cols = insert_statement.table.c
- for c in cols:
- col_key = c.key
- if col_key in set_parameters:
- value = set_parameters.pop(col_key)
- elif c in set_parameters:
- value = set_parameters.pop(c)
- else:
- continue
- if coercions._is_literal(value):
- value = elements.BindParameter(None, value, type_=c.type)
- else:
- if (
- isinstance(value, elements.BindParameter)
- and value.type._isnull
- ):
- value = value._clone()
- value.type = c.type
- value_text = self.process(value.self_group(), use_schema=False)
- key_text = self.preparer.quote(col_key)
- action_set_ops.append("%s = %s" % (key_text, value_text))
- # check for names that don't match columns
- if set_parameters:
- util.warn(
- "Additional column names not matching "
- "any column keys in table '%s': %s"
- % (
- self.current_executable.table.name,
- (", ".join("'%s'" % c for c in set_parameters)),
- )
- )
- for k, v in set_parameters.items():
- key_text = (
- self.preparer.quote(k)
- if isinstance(k, util.string_types)
- else self.process(k, use_schema=False)
- )
- value_text = self.process(
- coercions.expect(roles.ExpressionElementRole, v),
- use_schema=False,
- )
- action_set_ops.append("%s = %s" % (key_text, value_text))
- action_text = ", ".join(action_set_ops)
- if clause.update_whereclause is not None:
- action_text += " WHERE %s" % self.process(
- clause.update_whereclause, include_table=True, use_schema=False
- )
- return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
- def update_from_clause(
- self, update_stmt, from_table, extra_froms, from_hints, **kw
- ):
- kw["asfrom"] = True
- return "FROM " + ", ".join(
- t._compiler_dispatch(self, fromhints=from_hints, **kw)
- for t in extra_froms
- )
- def delete_extra_from_clause(
- self, delete_stmt, from_table, extra_froms, from_hints, **kw
- ):
- """Render the DELETE .. USING clause specific to PostgreSQL."""
- kw["asfrom"] = True
- return "USING " + ", ".join(
- t._compiler_dispatch(self, fromhints=from_hints, **kw)
- for t in extra_froms
- )
- def fetch_clause(self, select, **kw):
- # pg requires parens for non literal clauses. It's also required for
- # bind parameters if a ::type casts is used by the driver (asyncpg),
- # so it's easiest to just always add it
- text = ""
- if select._offset_clause is not None:
- text += "\n OFFSET (%s) ROWS" % self.process(
- select._offset_clause, **kw
- )
- if select._fetch_clause is not None:
- text += "\n FETCH FIRST (%s)%s ROWS %s" % (
- self.process(select._fetch_clause, **kw),
- " PERCENT" if select._fetch_clause_options["percent"] else "",
- "WITH TIES"
- if select._fetch_clause_options["with_ties"]
- else "ONLY",
- )
- return text
- class PGDDLCompiler(compiler.DDLCompiler):
- def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column)
- impl_type = column.type.dialect_impl(self.dialect)
- if isinstance(impl_type, sqltypes.TypeDecorator):
- impl_type = impl_type.impl
- has_identity = (
- column.identity is not None
- and self.dialect.supports_identity_columns
- )
- if (
- column.primary_key
- and column is column.table._autoincrement_column
- and (
- self.dialect.supports_smallserial
- or not isinstance(impl_type, sqltypes.SmallInteger)
- )
- and not has_identity
- and (
- column.default is None
- or (
- isinstance(column.default, schema.Sequence)
- and column.default.optional
- )
- )
- ):
- if isinstance(impl_type, sqltypes.BigInteger):
- colspec += " BIGSERIAL"
- elif isinstance(impl_type, sqltypes.SmallInteger):
- colspec += " SMALLSERIAL"
- else:
- colspec += " SERIAL"
- else:
- colspec += " " + self.dialect.type_compiler.process(
- column.type,
- type_expression=column,
- identifier_preparer=self.preparer,
- )
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
- if column.computed is not None:
- colspec += " " + self.process(column.computed)
- if has_identity:
- colspec += " " + self.process(column.identity)
- if not column.nullable and not has_identity:
- colspec += " NOT NULL"
- elif column.nullable and has_identity:
- colspec += " NULL"
- return colspec
- def visit_check_constraint(self, constraint):
- if constraint._type_bound:
- typ = list(constraint.columns)[0].type
- if (
- isinstance(typ, sqltypes.ARRAY)
- and isinstance(typ.item_type, sqltypes.Enum)
- and not typ.item_type.native_enum
- ):
- raise exc.CompileError(
- "PostgreSQL dialect cannot produce the CHECK constraint "
- "for ARRAY of non-native ENUM; please specify "
- "create_constraint=False on this Enum datatype."
- )
- return super(PGDDLCompiler, self).visit_check_constraint(constraint)
- def visit_drop_table_comment(self, drop):
- return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
- drop.element
- )
- def visit_create_enum_type(self, create):
- type_ = create.element
- return "CREATE TYPE %s AS ENUM (%s)" % (
- self.preparer.format_type(type_),
- ", ".join(
- self.sql_compiler.process(sql.literal(e), literal_binds=True)
- for e in type_.enums
- ),
- )
- def visit_drop_enum_type(self, drop):
- type_ = drop.element
- return "DROP TYPE %s" % (self.preparer.format_type(type_))
- def visit_create_index(self, create):
- preparer = self.preparer
- index = create.element
- self._verify_index_table(index)
- text = "CREATE "
- if index.unique:
- text += "UNIQUE "
- text += "INDEX "
- if self.dialect._supports_create_index_concurrently:
- concurrently = index.dialect_options["postgresql"]["concurrently"]
- if concurrently:
- text += "CONCURRENTLY "
- if create.if_not_exists:
- text += "IF NOT EXISTS "
- text += "%s ON %s " % (
- self._prepared_index_name(index, include_schema=False),
- preparer.format_table(index.table),
- )
- using = index.dialect_options["postgresql"]["using"]
- if using:
- text += (
- "USING %s "
- % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
- )
- ops = index.dialect_options["postgresql"]["ops"]
- text += "(%s)" % (
- ", ".join(
- [
- self.sql_compiler.process(
- expr.self_group()
- if not isinstance(expr, expression.ColumnClause)
- else expr,
- include_table=False,
- literal_binds=True,
- )
- + (
- (" " + ops[expr.key])
- if hasattr(expr, "key") and expr.key in ops
- else ""
- )
- for expr in index.expressions
- ]
- )
- )
- includeclause = index.dialect_options["postgresql"]["include"]
- if includeclause:
- inclusions = [
- index.table.c[col]
- if isinstance(col, util.string_types)
- else col
- for col in includeclause
- ]
- text += " INCLUDE (%s)" % ", ".join(
- [preparer.quote(c.name) for c in inclusions]
- )
- withclause = index.dialect_options["postgresql"]["with"]
- if withclause:
- text += " WITH (%s)" % (
- ", ".join(
- [
- "%s = %s" % storage_parameter
- for storage_parameter in withclause.items()
- ]
- )
- )
- tablespace_name = index.dialect_options["postgresql"]["tablespace"]
- if tablespace_name:
- text += " TABLESPACE %s" % preparer.quote(tablespace_name)
- whereclause = index.dialect_options["postgresql"]["where"]
- if whereclause is not None:
- whereclause = coercions.expect(
- roles.DDLExpressionRole, whereclause
- )
- where_compiled = self.sql_compiler.process(
- whereclause, include_table=False, literal_binds=True
- )
- text += " WHERE " + where_compiled
- return text
- def visit_drop_index(self, drop):
- index = drop.element
- text = "\nDROP INDEX "
- if self.dialect._supports_drop_index_concurrently:
- concurrently = index.dialect_options["postgresql"]["concurrently"]
- if concurrently:
- text += "CONCURRENTLY "
- if drop.if_exists:
- text += "IF EXISTS "
- text += self._prepared_index_name(index, include_schema=True)
- return text
- def visit_exclude_constraint(self, constraint, **kw):
- text = ""
- if constraint.name is not None:
- text += "CONSTRAINT %s " % self.preparer.format_constraint(
- constraint
- )
- elements = []
- for expr, name, op in constraint._render_exprs:
- kw["include_table"] = False
- exclude_element = self.sql_compiler.process(expr, **kw) + (
- (" " + constraint.ops[expr.key])
- if hasattr(expr, "key") and expr.key in constraint.ops
- else ""
- )
- elements.append("%s WITH %s" % (exclude_element, op))
- text += "EXCLUDE USING %s (%s)" % (
- self.preparer.validate_sql_phrase(
- constraint.using, IDX_USING
- ).lower(),
- ", ".join(elements),
- )
- if constraint.where is not None:
- text += " WHERE (%s)" % self.sql_compiler.process(
- constraint.where, literal_binds=True
- )
- text += self.define_constraint_deferrability(constraint)
- return text
- def post_create_table(self, table):
- table_opts = []
- pg_opts = table.dialect_options["postgresql"]
- inherits = pg_opts.get("inherits")
- if inherits is not None:
- if not isinstance(inherits, (list, tuple)):
- inherits = (inherits,)
- table_opts.append(
- "\n INHERITS ( "
- + ", ".join(self.preparer.quote(name) for name in inherits)
- + " )"
- )
- if pg_opts["partition_by"]:
- table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
- if pg_opts["with_oids"] is True:
- table_opts.append("\n WITH OIDS")
- elif pg_opts["with_oids"] is False:
- table_opts.append("\n WITHOUT OIDS")
- if pg_opts["on_commit"]:
- on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
- table_opts.append("\n ON COMMIT %s" % on_commit_options)
- if pg_opts["tablespace"]:
- tablespace_name = pg_opts["tablespace"]
- table_opts.append(
- "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
- )
- return "".join(table_opts)
- def visit_computed_column(self, generated):
- if generated.persisted is False:
- raise exc.CompileError(
- "PostrgreSQL computed columns do not support 'virtual' "
- "persistence; set the 'persisted' flag to None or True for "
- "PostgreSQL support."
- )
- return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
- generated.sqltext, include_table=False, literal_binds=True
- )
- def visit_create_sequence(self, create, **kw):
- prefix = None
- if create.element.data_type is not None:
- prefix = " AS %s" % self.type_compiler.process(
- create.element.data_type
- )
- return super(PGDDLCompiler, self).visit_create_sequence(
- create, prefix=prefix, **kw
- )
- class PGTypeCompiler(compiler.GenericTypeCompiler):
- def visit_TSVECTOR(self, type_, **kw):
- return "TSVECTOR"
- def visit_INET(self, type_, **kw):
- return "INET"
- def visit_CIDR(self, type_, **kw):
- return "CIDR"
- def visit_MACADDR(self, type_, **kw):
- return "MACADDR"
- def visit_MONEY(self, type_, **kw):
- return "MONEY"
- def visit_OID(self, type_, **kw):
- return "OID"
- def visit_REGCLASS(self, type_, **kw):
- return "REGCLASS"
- def visit_FLOAT(self, type_, **kw):
- if not type_.precision:
- return "FLOAT"
- else:
- return "FLOAT(%(precision)s)" % {"precision": type_.precision}
- def visit_DOUBLE_PRECISION(self, type_, **kw):
- return "DOUBLE PRECISION"
- def visit_BIGINT(self, type_, **kw):
- return "BIGINT"
- def visit_HSTORE(self, type_, **kw):
- return "HSTORE"
- def visit_JSON(self, type_, **kw):
- return "JSON"
- def visit_JSONB(self, type_, **kw):
- return "JSONB"
- def visit_INT4RANGE(self, type_, **kw):
- return "INT4RANGE"
- def visit_INT8RANGE(self, type_, **kw):
- return "INT8RANGE"
- def visit_NUMRANGE(self, type_, **kw):
- return "NUMRANGE"
- def visit_DATERANGE(self, type_, **kw):
- return "DATERANGE"
- def visit_TSRANGE(self, type_, **kw):
- return "TSRANGE"
- def visit_TSTZRANGE(self, type_, **kw):
- return "TSTZRANGE"
- def visit_datetime(self, type_, **kw):
- return self.visit_TIMESTAMP(type_, **kw)
- def visit_enum(self, type_, **kw):
- if not type_.native_enum or not self.dialect.supports_native_enum:
- return super(PGTypeCompiler, self).visit_enum(type_, **kw)
- else:
- return self.visit_ENUM(type_, **kw)
- def visit_ENUM(self, type_, identifier_preparer=None, **kw):
- if identifier_preparer is None:
- identifier_preparer = self.dialect.identifier_preparer
- return identifier_preparer.format_type(type_)
- def visit_TIMESTAMP(self, type_, **kw):
- return "TIMESTAMP%s %s" % (
- "(%d)" % type_.precision
- if getattr(type_, "precision", None) is not None
- else "",
- (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
- )
- def visit_TIME(self, type_, **kw):
- return "TIME%s %s" % (
- "(%d)" % type_.precision
- if getattr(type_, "precision", None) is not None
- else "",
- (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
- )
- def visit_INTERVAL(self, type_, **kw):
- text = "INTERVAL"
- if type_.fields is not None:
- text += " " + type_.fields
- if type_.precision is not None:
- text += " (%d)" % type_.precision
- return text
- def visit_BIT(self, type_, **kw):
- if type_.varying:
- compiled = "BIT VARYING"
- if type_.length is not None:
- compiled += "(%d)" % type_.length
- else:
- compiled = "BIT(%d)" % type_.length
- return compiled
- def visit_UUID(self, type_, **kw):
- return "UUID"
- def visit_large_binary(self, type_, **kw):
- return self.visit_BYTEA(type_, **kw)
- def visit_BYTEA(self, type_, **kw):
- return "BYTEA"
- def visit_ARRAY(self, type_, **kw):
- inner = self.process(type_.item_type, **kw)
- return re.sub(
- r"((?: COLLATE.*)?)$",
- (
- r"%s\1"
- % (
- "[]"
- * (type_.dimensions if type_.dimensions is not None else 1)
- )
- ),
- inner,
- count=1,
- )
- class PGIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = RESERVED_WORDS
- def _unquote_identifier(self, value):
- if value[0] == self.initial_quote:
- value = value[1:-1].replace(
- self.escape_to_quote, self.escape_quote
- )
- return value
- def format_type(self, type_, use_schema=True):
- if not type_.name:
- raise exc.CompileError("PostgreSQL ENUM type requires a name.")
- name = self.quote(type_.name)
- effective_schema = self.schema_for_object(type_)
- if (
- not self.omit_schema
- and use_schema
- and effective_schema is not None
- ):
- name = self.quote_schema(effective_schema) + "." + name
- return name
- class PGInspector(reflection.Inspector):
- def get_table_oid(self, table_name, schema=None):
- """Return the OID for the given table name."""
- with self._operation_context() as conn:
- return self.dialect.get_table_oid(
- conn, table_name, schema, info_cache=self.info_cache
- )
- def get_enums(self, schema=None):
- """Return a list of ENUM objects.
- Each member is a dictionary containing these fields:
- * name - name of the enum
- * schema - the schema name for the enum.
- * visible - boolean, whether or not this enum is visible
- in the default search path.
- * labels - a list of string labels that apply to the enum.
- :param schema: schema name. If None, the default schema
- (typically 'public') is used. May also be set to '*' to
- indicate load enums for all schemas.
- .. versionadded:: 1.0.0
- """
- schema = schema or self.default_schema_name
- with self._operation_context() as conn:
- return self.dialect._load_enums(conn, schema)
- def get_foreign_table_names(self, schema=None):
- """Return a list of FOREIGN TABLE names.
- Behavior is similar to that of
- :meth:`_reflection.Inspector.get_table_names`,
- except that the list is limited to those tables that report a
- ``relkind`` value of ``f``.
- .. versionadded:: 1.0.0
- """
- schema = schema or self.default_schema_name
- with self._operation_context() as conn:
- return self.dialect._get_foreign_table_names(conn, schema)
- def get_view_names(self, schema=None, include=("plain", "materialized")):
- """Return all view names in `schema`.
- :param schema: Optional, retrieve names from a non-default schema.
- For special quoting, use :class:`.quoted_name`.
- :param include: specify which types of views to return. Passed
- as a string value (for a single type) or a tuple (for any number
- of types). Defaults to ``('plain', 'materialized')``.
- .. versionadded:: 1.1
- """
- with self._operation_context() as conn:
- return self.dialect.get_view_names(
- conn, schema, info_cache=self.info_cache, include=include
- )
- class CreateEnumType(schema._CreateDropBase):
- __visit_name__ = "create_enum_type"
- class DropEnumType(schema._CreateDropBase):
- __visit_name__ = "drop_enum_type"
- class PGExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq, type_):
- return self._execute_scalar(
- (
- "select nextval('%s')"
- % self.identifier_preparer.format_sequence(seq)
- ),
- type_,
- )
- def get_insert_default(self, column):
- if column.primary_key and column is column.table._autoincrement_column:
- if column.server_default and column.server_default.has_argument:
- # pre-execute passive defaults on primary key columns
- return self._execute_scalar(
- "select %s" % column.server_default.arg, column.type
- )
- elif column.default is None or (
- column.default.is_sequence and column.default.optional
- ):
- # execute the sequence associated with a SERIAL primary
- # key column. for non-primary-key SERIAL, the ID just
- # generates server side.
- try:
- seq_name = column._postgresql_seq_name
- except AttributeError:
- tab = column.table.name
- col = column.name
- tab = tab[0 : 29 + max(0, (29 - len(col)))]
- col = col[0 : 29 + max(0, (29 - len(tab)))]
- name = "%s_%s_seq" % (tab, col)
- column._postgresql_seq_name = seq_name = name
- if column.table is not None:
- effective_schema = self.connection.schema_for_object(
- column.table
- )
- else:
- effective_schema = None
- if effective_schema is not None:
- exc = 'select nextval(\'"%s"."%s"\')' % (
- effective_schema,
- seq_name,
- )
- else:
- exc = "select nextval('\"%s\"')" % (seq_name,)
- return self._execute_scalar(exc, column.type)
- return super(PGExecutionContext, self).get_insert_default(column)
- def should_autocommit_text(self, statement):
- return AUTOCOMMIT_REGEXP.match(statement)
- class PGReadOnlyConnectionCharacteristic(
- characteristics.ConnectionCharacteristic
- ):
- transactional = True
- def reset_characteristic(self, dialect, dbapi_conn):
- dialect.set_readonly(dbapi_conn, False)
- def set_characteristic(self, dialect, dbapi_conn, value):
- dialect.set_readonly(dbapi_conn, value)
- def get_characteristic(self, dialect, dbapi_conn):
- return dialect.get_readonly(dbapi_conn)
- class PGDeferrableConnectionCharacteristic(
- characteristics.ConnectionCharacteristic
- ):
- transactional = True
- def reset_characteristic(self, dialect, dbapi_conn):
- dialect.set_deferrable(dbapi_conn, False)
- def set_characteristic(self, dialect, dbapi_conn, value):
- dialect.set_deferrable(dbapi_conn, value)
- def get_characteristic(self, dialect, dbapi_conn):
- return dialect.get_deferrable(dbapi_conn)
- class PGDialect(default.DefaultDialect):
- name = "postgresql"
- supports_statement_cache = True
- supports_alter = True
- max_identifier_length = 63
- supports_sane_rowcount = True
- supports_native_enum = True
- supports_native_boolean = True
- supports_smallserial = True
- supports_sequences = True
- sequences_optional = True
- preexecute_autoincrement_sequences = True
- postfetch_lastrowid = False
- supports_comments = True
- supports_default_values = True
- supports_default_metavalue = True
- supports_empty_insert = False
- supports_multivalues_insert = True
- supports_identity_columns = True
- default_paramstyle = "pyformat"
- ischema_names = ischema_names
- colspecs = colspecs
- statement_compiler = PGCompiler
- ddl_compiler = PGDDLCompiler
- type_compiler = PGTypeCompiler
- preparer = PGIdentifierPreparer
- execution_ctx_cls = PGExecutionContext
- inspector = PGInspector
- isolation_level = None
- implicit_returning = True
- full_returning = True
- connection_characteristics = (
- default.DefaultDialect.connection_characteristics
- )
- connection_characteristics = connection_characteristics.union(
- {
- "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
- "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
- }
- )
- construct_arguments = [
- (
- schema.Index,
- {
- "using": False,
- "include": None,
- "where": None,
- "ops": {},
- "concurrently": False,
- "with": {},
- "tablespace": None,
- },
- ),
- (
- schema.Table,
- {
- "ignore_search_path": False,
- "tablespace": None,
- "partition_by": None,
- "with_oids": None,
- "on_commit": None,
- "inherits": None,
- },
- ),
- ]
- reflection_options = ("postgresql_ignore_search_path",)
- _backslash_escapes = True
- _supports_create_index_concurrently = True
- _supports_drop_index_concurrently = True
- def __init__(
- self,
- isolation_level=None,
- json_serializer=None,
- json_deserializer=None,
- **kwargs
- ):
- default.DefaultDialect.__init__(self, **kwargs)
- # the isolation_level parameter to the PGDialect itself is legacy.
- # still works however the execution_options method is the one that
- # is documented.
- self.isolation_level = isolation_level
- self._json_deserializer = json_deserializer
- self._json_serializer = json_serializer
- def initialize(self, connection):
- super(PGDialect, self).initialize(connection)
- if self.server_version_info <= (8, 2):
- self.full_returning = self.implicit_returning = False
- self.supports_native_enum = self.server_version_info >= (8, 3)
- if not self.supports_native_enum:
- self.colspecs = self.colspecs.copy()
- # pop base Enum type
- self.colspecs.pop(sqltypes.Enum, None)
- # psycopg2, others may have placed ENUM here as well
- self.colspecs.pop(ENUM, None)
- # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
- self.supports_smallserial = self.server_version_info >= (9, 2)
- if self.server_version_info < (8, 2):
- self._backslash_escapes = False
- else:
- # ensure this query is not emitted on server version < 8.2
- # as it will fail
- std_string = connection.exec_driver_sql(
- "show standard_conforming_strings"
- ).scalar()
- self._backslash_escapes = std_string == "off"
- self._supports_create_index_concurrently = (
- self.server_version_info >= (8, 2)
- )
- self._supports_drop_index_concurrently = self.server_version_info >= (
- 9,
- 2,
- )
- self.supports_identity_columns = self.server_version_info >= (10,)
- def on_connect(self):
- if self.isolation_level is not None:
- def connect(conn):
- self.set_isolation_level(conn, self.isolation_level)
- return connect
- else:
- return None
- _isolation_lookup = set(
- [
- "SERIALIZABLE",
- "READ UNCOMMITTED",
- "READ COMMITTED",
- "REPEATABLE READ",
- ]
- )
- def set_isolation_level(self, connection, level):
- level = level.replace("_", " ")
- if level not in self._isolation_lookup:
- raise exc.ArgumentError(
- "Invalid value '%s' for isolation_level. "
- "Valid isolation levels for %s are %s"
- % (level, self.name, ", ".join(self._isolation_lookup))
- )
- cursor = connection.cursor()
- cursor.execute(
- "SET SESSION CHARACTERISTICS AS TRANSACTION "
- "ISOLATION LEVEL %s" % level
- )
- cursor.execute("COMMIT")
- cursor.close()
- def get_isolation_level(self, connection):
- cursor = connection.cursor()
- cursor.execute("show transaction isolation level")
- val = cursor.fetchone()[0]
- cursor.close()
- return val.upper()
- def set_readonly(self, connection, value):
- raise NotImplementedError()
- def get_readonly(self, connection):
- raise NotImplementedError()
- def set_deferrable(self, connection, value):
- raise NotImplementedError()
- def get_deferrable(self, connection):
- raise NotImplementedError()
- def do_begin_twophase(self, connection, xid):
- self.do_begin(connection.connection)
- def do_prepare_twophase(self, connection, xid):
- connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if is_prepared:
- if recover:
- # FIXME: ugly hack to get out of transaction
- # context when committing recoverable transactions
- # Must find out a way how to make the dbapi not
- # open a transaction.
- connection.exec_driver_sql("ROLLBACK")
- connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
- connection.exec_driver_sql("BEGIN")
- self.do_rollback(connection.connection)
- else:
- self.do_rollback(connection.connection)
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if is_prepared:
- if recover:
- connection.exec_driver_sql("ROLLBACK")
- connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
- connection.exec_driver_sql("BEGIN")
- self.do_rollback(connection.connection)
- else:
- self.do_commit(connection.connection)
- def do_recover_twophase(self, connection):
- resultset = connection.execute(
- sql.text("SELECT gid FROM pg_prepared_xacts")
- )
- return [row[0] for row in resultset]
- def _get_default_schema_name(self, connection):
- return connection.exec_driver_sql("select current_schema()").scalar()
- def has_schema(self, connection, schema):
- query = (
- "select nspname from pg_namespace " "where lower(nspname)=:schema"
- )
- cursor = connection.execute(
- sql.text(query).bindparams(
- sql.bindparam(
- "schema",
- util.text_type(schema.lower()),
- type_=sqltypes.Unicode,
- )
- )
- )
- return bool(cursor.first())
- def has_table(self, connection, table_name, schema=None):
- self._ensure_has_table_connection(connection)
- # seems like case gets folded in pg_class...
- if schema is None:
- cursor = connection.execute(
- sql.text(
- "select relname from pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where "
- "pg_catalog.pg_table_is_visible(c.oid) "
- "and relname=:name"
- ).bindparams(
- sql.bindparam(
- "name",
- util.text_type(table_name),
- type_=sqltypes.Unicode,
- )
- )
- )
- else:
- cursor = connection.execute(
- sql.text(
- "select relname from pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where n.nspname=:schema and "
- "relname=:name"
- ).bindparams(
- sql.bindparam(
- "name",
- util.text_type(table_name),
- type_=sqltypes.Unicode,
- ),
- sql.bindparam(
- "schema",
- util.text_type(schema),
- type_=sqltypes.Unicode,
- ),
- )
- )
- return bool(cursor.first())
- def has_sequence(self, connection, sequence_name, schema=None):
- if schema is None:
- schema = self.default_schema_name
- cursor = connection.execute(
- sql.text(
- "SELECT relname FROM pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where relkind='S' and "
- "n.nspname=:schema and relname=:name"
- ).bindparams(
- sql.bindparam(
- "name",
- util.text_type(sequence_name),
- type_=sqltypes.Unicode,
- ),
- sql.bindparam(
- "schema",
- util.text_type(schema),
- type_=sqltypes.Unicode,
- ),
- )
- )
- return bool(cursor.first())
- def has_type(self, connection, type_name, schema=None):
- if schema is not None:
- query = """
- SELECT EXISTS (
- SELECT * FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n
- WHERE t.typnamespace = n.oid
- AND t.typname = :typname
- AND n.nspname = :nspname
- )
- """
- query = sql.text(query)
- else:
- query = """
- SELECT EXISTS (
- SELECT * FROM pg_catalog.pg_type t
- WHERE t.typname = :typname
- AND pg_type_is_visible(t.oid)
- )
- """
- query = sql.text(query)
- query = query.bindparams(
- sql.bindparam(
- "typname", util.text_type(type_name), type_=sqltypes.Unicode
- )
- )
- if schema is not None:
- query = query.bindparams(
- sql.bindparam(
- "nspname", util.text_type(schema), type_=sqltypes.Unicode
- )
- )
- cursor = connection.execute(query)
- return bool(cursor.scalar())
- def _get_server_version_info(self, connection):
- v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
- m = re.match(
- r".*(?:PostgreSQL|EnterpriseDB) "
- r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
- v,
- )
- if not m:
- raise AssertionError(
- "Could not determine version from string '%s'" % v
- )
- return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
- @reflection.cache
- def get_table_oid(self, connection, table_name, schema=None, **kw):
- """Fetch the oid for schema.table_name.
- Several reflection methods require the table oid. The idea for using
- this method is that it can be fetched one time and cached for
- subsequent calls.
- """
- table_oid = None
- if schema is not None:
- schema_where_clause = "n.nspname = :schema"
- else:
- schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"
- query = (
- """
- SELECT c.oid
- FROM pg_catalog.pg_class c
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
- WHERE (%s)
- AND c.relname = :table_name AND c.relkind in
- ('r', 'v', 'm', 'f', 'p')
- """
- % schema_where_clause
- )
- # Since we're binding to unicode, table_name and schema_name must be
- # unicode.
- table_name = util.text_type(table_name)
- if schema is not None:
- schema = util.text_type(schema)
- s = sql.text(query).bindparams(table_name=sqltypes.Unicode)
- s = s.columns(oid=sqltypes.Integer)
- if schema:
- s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode))
- c = connection.execute(s, dict(table_name=table_name, schema=schema))
- table_oid = c.scalar()
- if table_oid is None:
- raise exc.NoSuchTableError(table_name)
- return table_oid
- @reflection.cache
- def get_schema_names(self, connection, **kw):
- result = connection.execute(
- sql.text(
- "SELECT nspname FROM pg_namespace "
- "WHERE nspname NOT LIKE 'pg_%' "
- "ORDER BY nspname"
- ).columns(nspname=sqltypes.Unicode)
- )
- return [name for name, in result]
- @reflection.cache
- def get_table_names(self, connection, schema=None, **kw):
- result = connection.execute(
- sql.text(
- "SELECT c.relname FROM pg_class c "
- "JOIN pg_namespace n ON n.oid = c.relnamespace "
- "WHERE n.nspname = :schema AND c.relkind in ('r', 'p')"
- ).columns(relname=sqltypes.Unicode),
- dict(
- schema=schema
- if schema is not None
- else self.default_schema_name
- ),
- )
- return [name for name, in result]
- @reflection.cache
- def _get_foreign_table_names(self, connection, schema=None, **kw):
- result = connection.execute(
- sql.text(
- "SELECT c.relname FROM pg_class c "
- "JOIN pg_namespace n ON n.oid = c.relnamespace "
- "WHERE n.nspname = :schema AND c.relkind = 'f'"
- ).columns(relname=sqltypes.Unicode),
- dict(
- schema=schema
- if schema is not None
- else self.default_schema_name
- ),
- )
- return [name for name, in result]
- @reflection.cache
- def get_view_names(
- self, connection, schema=None, include=("plain", "materialized"), **kw
- ):
- include_kind = {"plain": "v", "materialized": "m"}
- try:
- kinds = [include_kind[i] for i in util.to_list(include)]
- except KeyError:
- raise ValueError(
- "include %r unknown, needs to be a sequence containing "
- "one or both of 'plain' and 'materialized'" % (include,)
- )
- if not kinds:
- raise ValueError(
- "empty include, needs to be a sequence containing "
- "one or both of 'plain' and 'materialized'"
- )
- result = connection.execute(
- sql.text(
- "SELECT c.relname FROM pg_class c "
- "JOIN pg_namespace n ON n.oid = c.relnamespace "
- "WHERE n.nspname = :schema AND c.relkind IN (%s)"
- % (", ".join("'%s'" % elem for elem in kinds))
- ).columns(relname=sqltypes.Unicode),
- dict(
- schema=schema
- if schema is not None
- else self.default_schema_name
- ),
- )
- return [name for name, in result]
- @reflection.cache
- def get_sequence_names(self, connection, schema=None, **kw):
- if not schema:
- schema = self.default_schema_name
- cursor = connection.execute(
- sql.text(
- "SELECT relname FROM pg_class c join pg_namespace n on "
- "n.oid=c.relnamespace where relkind='S' and "
- "n.nspname=:schema"
- ).bindparams(
- sql.bindparam(
- "schema",
- util.text_type(schema),
- type_=sqltypes.Unicode,
- ),
- )
- )
- return [row[0] for row in cursor]
- @reflection.cache
- def get_view_definition(self, connection, view_name, schema=None, **kw):
- view_def = connection.scalar(
- sql.text(
- "SELECT pg_get_viewdef(c.oid) view_def FROM pg_class c "
- "JOIN pg_namespace n ON n.oid = c.relnamespace "
- "WHERE n.nspname = :schema AND c.relname = :view_name "
- "AND c.relkind IN ('v', 'm')"
- ).columns(view_def=sqltypes.Unicode),
- dict(
- schema=schema
- if schema is not None
- else self.default_schema_name,
- view_name=view_name,
- ),
- )
- return view_def
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- table_oid = self.get_table_oid(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
- generated = (
- "a.attgenerated as generated"
- if self.server_version_info >= (12,)
- else "NULL as generated"
- )
- if self.server_version_info >= (10,):
- # a.attidentity != '' is required or it will reflect also
- # serial columns as identity.
- identity = """\
- (SELECT json_build_object(
- 'always', a.attidentity = 'a',
- 'start', s.seqstart,
- 'increment', s.seqincrement,
- 'minvalue', s.seqmin,
- 'maxvalue', s.seqmax,
- 'cache', s.seqcache,
- 'cycle', s.seqcycle)
- FROM pg_catalog.pg_sequence s
- JOIN pg_catalog.pg_class c on s.seqrelid = c."oid"
- WHERE c.relkind = 'S'
- AND a.attidentity != ''
- AND s.seqrelid = pg_catalog.pg_get_serial_sequence(
- a.attrelid::regclass::text, a.attname
- )::regclass::oid
- ) as identity_options\
- """
- else:
- identity = "NULL as identity_options"
- SQL_COLS = """
- SELECT a.attname,
- pg_catalog.format_type(a.atttypid, a.atttypmod),
- (
- SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
- FROM pg_catalog.pg_attrdef d
- WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
- AND a.atthasdef
- ) AS DEFAULT,
- a.attnotnull,
- a.attrelid as table_oid,
- pgd.description as comment,
- %s,
- %s
- FROM pg_catalog.pg_attribute a
- LEFT JOIN pg_catalog.pg_description pgd ON (
- pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
- WHERE a.attrelid = :table_oid
- AND a.attnum > 0 AND NOT a.attisdropped
- ORDER BY a.attnum
- """ % (
- generated,
- identity,
- )
- s = (
- sql.text(SQL_COLS)
- .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
- .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode)
- )
- c = connection.execute(s, dict(table_oid=table_oid))
- rows = c.fetchall()
- # dictionary with (name, ) if default search path or (schema, name)
- # as keys
- domains = self._load_domains(connection)
- # dictionary with (name, ) if default search path or (schema, name)
- # as keys
- enums = dict(
- ((rec["name"],), rec)
- if rec["visible"]
- else ((rec["schema"], rec["name"]), rec)
- for rec in self._load_enums(connection, schema="*")
- )
- # format columns
- columns = []
- for (
- name,
- format_type,
- default_,
- notnull,
- table_oid,
- comment,
- generated,
- identity,
- ) in rows:
- column_info = self._get_column_info(
- name,
- format_type,
- default_,
- notnull,
- domains,
- enums,
- schema,
- comment,
- generated,
- identity,
- )
- columns.append(column_info)
- return columns
- def _get_column_info(
- self,
- name,
- format_type,
- default,
- notnull,
- domains,
- enums,
- schema,
- comment,
- generated,
- identity,
- ):
- def _handle_array_type(attype):
- return (
- # strip '[]' from integer[], etc.
- re.sub(r"\[\]$", "", attype),
- attype.endswith("[]"),
- )
- # strip (*) from character varying(5), timestamp(5)
- # with time zone, geometry(POLYGON), etc.
- attype = re.sub(r"\(.*\)", "", format_type)
- # strip '[]' from integer[], etc. and check if an array
- attype, is_array = _handle_array_type(attype)
- # strip quotes from case sensitive enum or domain names
- enum_or_domain_key = tuple(util.quoted_token_parser(attype))
- nullable = not notnull
- charlen = re.search(r"\(([\d,]+)\)", format_type)
- if charlen:
- charlen = charlen.group(1)
- args = re.search(r"\((.*)\)", format_type)
- if args and args.group(1):
- args = tuple(re.split(r"\s*,\s*", args.group(1)))
- else:
- args = ()
- kwargs = {}
- if attype == "numeric":
- if charlen:
- prec, scale = charlen.split(",")
- args = (int(prec), int(scale))
- else:
- args = ()
- elif attype == "double precision":
- args = (53,)
- elif attype == "integer":
- args = ()
- elif attype in ("timestamp with time zone", "time with time zone"):
- kwargs["timezone"] = True
- if charlen:
- kwargs["precision"] = int(charlen)
- args = ()
- elif attype in (
- "timestamp without time zone",
- "time without time zone",
- "time",
- ):
- kwargs["timezone"] = False
- if charlen:
- kwargs["precision"] = int(charlen)
- args = ()
- elif attype == "bit varying":
- kwargs["varying"] = True
- if charlen:
- args = (int(charlen),)
- else:
- args = ()
- elif attype.startswith("interval"):
- field_match = re.match(r"interval (.+)", attype, re.I)
- if charlen:
- kwargs["precision"] = int(charlen)
- if field_match:
- kwargs["fields"] = field_match.group(1)
- attype = "interval"
- args = ()
- elif charlen:
- args = (int(charlen),)
- while True:
- # looping here to suit nested domains
- if attype in self.ischema_names:
- coltype = self.ischema_names[attype]
- break
- elif enum_or_domain_key in enums:
- enum = enums[enum_or_domain_key]
- coltype = ENUM
- kwargs["name"] = enum["name"]
- if not enum["visible"]:
- kwargs["schema"] = enum["schema"]
- args = tuple(enum["labels"])
- break
- elif enum_or_domain_key in domains:
- domain = domains[enum_or_domain_key]
- attype = domain["attype"]
- attype, is_array = _handle_array_type(attype)
- # strip quotes from case sensitive enum or domain names
- enum_or_domain_key = tuple(util.quoted_token_parser(attype))
- # A table can't override a not null on the domain,
- # but can override nullable
- nullable = nullable and domain["nullable"]
- if domain["default"] and not default:
- # It can, however, override the default
- # value, but can't set it to null.
- default = domain["default"]
- continue
- else:
- coltype = None
- break
- if coltype:
- coltype = coltype(*args, **kwargs)
- if is_array:
- coltype = self.ischema_names["_array"](coltype)
- else:
- util.warn(
- "Did not recognize type '%s' of column '%s'" % (attype, name)
- )
- coltype = sqltypes.NULLTYPE
- # If a zero byte or blank string depending on driver (is also absent
- # for older PG versions), then not a generated column. Otherwise, s =
- # stored. (Other values might be added in the future.)
- if generated not in (None, "", b"\x00"):
- computed = dict(
- sqltext=default, persisted=generated in ("s", b"s")
- )
- default = None
- else:
- computed = None
- # adjust the default value
- autoincrement = False
- if default is not None:
- match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
- if match is not None:
- if issubclass(coltype._type_affinity, sqltypes.Integer):
- autoincrement = True
- # the default is related to a Sequence
- sch = schema
- if "." not in match.group(2) and sch is not None:
- # unconditionally quote the schema name. this could
- # later be enhanced to obey quoting rules /
- # "quote schema"
- default = (
- match.group(1)
- + ('"%s"' % sch)
- + "."
- + match.group(2)
- + match.group(3)
- )
- column_info = dict(
- name=name,
- type=coltype,
- nullable=nullable,
- default=default,
- autoincrement=autoincrement or identity is not None,
- comment=comment,
- )
- if computed is not None:
- column_info["computed"] = computed
- if identity is not None:
- column_info["identity"] = identity
- return column_info
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- table_oid = self.get_table_oid(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
- if self.server_version_info < (8, 4):
- PK_SQL = """
- SELECT a.attname
- FROM
- pg_class t
- join pg_index ix on t.oid = ix.indrelid
- join pg_attribute a
- on t.oid=a.attrelid AND %s
- WHERE
- t.oid = :table_oid and ix.indisprimary = 't'
- ORDER BY a.attnum
- """ % self._pg_index_any(
- "a.attnum", "ix.indkey"
- )
- else:
- # unnest() and generate_subscripts() both introduced in
- # version 8.4
- PK_SQL = """
- SELECT a.attname
- FROM pg_attribute a JOIN (
- SELECT unnest(ix.indkey) attnum,
- generate_subscripts(ix.indkey, 1) ord
- FROM pg_index ix
- WHERE ix.indrelid = :table_oid AND ix.indisprimary
- ) k ON a.attnum=k.attnum
- WHERE a.attrelid = :table_oid
- ORDER BY k.ord
- """
- t = sql.text(PK_SQL).columns(attname=sqltypes.Unicode)
- c = connection.execute(t, dict(table_oid=table_oid))
- cols = [r[0] for r in c.fetchall()]
- PK_CONS_SQL = """
- SELECT conname
- FROM pg_catalog.pg_constraint r
- WHERE r.conrelid = :table_oid AND r.contype = 'p'
- ORDER BY 1
- """
- t = sql.text(PK_CONS_SQL).columns(conname=sqltypes.Unicode)
- c = connection.execute(t, dict(table_oid=table_oid))
- name = c.scalar()
- return {"constrained_columns": cols, "name": name}
- @reflection.cache
- def get_foreign_keys(
- self,
- connection,
- table_name,
- schema=None,
- postgresql_ignore_search_path=False,
- **kw
- ):
- preparer = self.identifier_preparer
- table_oid = self.get_table_oid(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
- FK_SQL = """
- SELECT r.conname,
- pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
- n.nspname as conschema
- FROM pg_catalog.pg_constraint r,
- pg_namespace n,
- pg_class c
- WHERE r.conrelid = :table AND
- r.contype = 'f' AND
- c.oid = confrelid AND
- n.oid = c.relnamespace
- ORDER BY 1
- """
- # https://www.postgresql.org/docs/9.0/static/sql-createtable.html
- FK_REGEX = re.compile(
- r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)"
- r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
- r"[\s]?(ON UPDATE "
- r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
- r"[\s]?(ON DELETE "
- r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
- r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
- r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
- )
- t = sql.text(FK_SQL).columns(
- conname=sqltypes.Unicode, condef=sqltypes.Unicode
- )
- c = connection.execute(t, dict(table=table_oid))
- fkeys = []
- for conname, condef, conschema in c.fetchall():
- m = re.search(FK_REGEX, condef).groups()
- (
- constrained_columns,
- referred_schema,
- referred_table,
- referred_columns,
- _,
- match,
- _,
- onupdate,
- _,
- ondelete,
- deferrable,
- _,
- initially,
- ) = m
- if deferrable is not None:
- deferrable = True if deferrable == "DEFERRABLE" else False
- constrained_columns = [
- preparer._unquote_identifier(x)
- for x in re.split(r"\s*,\s*", constrained_columns)
- ]
- if postgresql_ignore_search_path:
- # when ignoring search path, we use the actual schema
- # provided it isn't the "default" schema
- if conschema != self.default_schema_name:
- referred_schema = conschema
- else:
- referred_schema = schema
- elif referred_schema:
- # referred_schema is the schema that we regexp'ed from
- # pg_get_constraintdef(). If the schema is in the search
- # path, pg_get_constraintdef() will give us None.
- referred_schema = preparer._unquote_identifier(referred_schema)
- elif schema is not None and schema == conschema:
- # If the actual schema matches the schema of the table
- # we're reflecting, then we will use that.
- referred_schema = schema
- referred_table = preparer._unquote_identifier(referred_table)
- referred_columns = [
- preparer._unquote_identifier(x)
- for x in re.split(r"\s*,\s", referred_columns)
- ]
- options = {
- k: v
- for k, v in [
- ("onupdate", onupdate),
- ("ondelete", ondelete),
- ("initially", initially),
- ("deferrable", deferrable),
- ("match", match),
- ]
- if v is not None and v != "NO ACTION"
- }
- fkey_d = {
- "name": conname,
- "constrained_columns": constrained_columns,
- "referred_schema": referred_schema,
- "referred_table": referred_table,
- "referred_columns": referred_columns,
- "options": options,
- }
- fkeys.append(fkey_d)
- return fkeys
- def _pg_index_any(self, col, compare_to):
- if self.server_version_info < (8, 1):
- # https://www.postgresql.org/message-id/10279.1124395722@sss.pgh.pa.us
- # "In CVS tip you could replace this with "attnum = ANY (indkey)".
- # Unfortunately, most array support doesn't work on int2vector in
- # pre-8.1 releases, so I think you're kinda stuck with the above
- # for now.
- # regards, tom lane"
- return "(%s)" % " OR ".join(
- "%s[%d] = %s" % (compare_to, ind, col) for ind in range(0, 10)
- )
- else:
- return "%s = ANY(%s)" % (col, compare_to)
- @reflection.cache
- def get_indexes(self, connection, table_name, schema, **kw):
- table_oid = self.get_table_oid(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
- # cast indkey as varchar since it's an int2vector,
- # returned as a list by some drivers such as pypostgresql
- if self.server_version_info < (8, 5):
- IDX_SQL = """
- SELECT
- i.relname as relname,
- ix.indisunique, ix.indexprs, ix.indpred,
- a.attname, a.attnum, NULL, ix.indkey%s,
- %s, %s, am.amname,
- NULL as indnkeyatts
- FROM
- pg_class t
- join pg_index ix on t.oid = ix.indrelid
- join pg_class i on i.oid = ix.indexrelid
- left outer join
- pg_attribute a
- on t.oid = a.attrelid and %s
- left outer join
- pg_am am
- on i.relam = am.oid
- WHERE
- t.relkind IN ('r', 'v', 'f', 'm')
- and t.oid = :table_oid
- and ix.indisprimary = 'f'
- ORDER BY
- t.relname,
- i.relname
- """ % (
- # version 8.3 here was based on observing the
- # cast does not work in PG 8.2.4, does work in 8.3.0.
- # nothing in PG changelogs regarding this.
- "::varchar" if self.server_version_info >= (8, 3) else "",
- "ix.indoption::varchar"
- if self.server_version_info >= (8, 3)
- else "NULL",
- "i.reloptions"
- if self.server_version_info >= (8, 2)
- else "NULL",
- self._pg_index_any("a.attnum", "ix.indkey"),
- )
- else:
- IDX_SQL = """
- SELECT
- i.relname as relname,
- ix.indisunique, ix.indexprs,
- a.attname, a.attnum, c.conrelid, ix.indkey::varchar,
- ix.indoption::varchar, i.reloptions, am.amname,
- pg_get_expr(ix.indpred, ix.indrelid),
- %s as indnkeyatts
- FROM
- pg_class t
- join pg_index ix on t.oid = ix.indrelid
- join pg_class i on i.oid = ix.indexrelid
- left outer join
- pg_attribute a
- on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
- left outer join
- pg_constraint c
- on (ix.indrelid = c.conrelid and
- ix.indexrelid = c.conindid and
- c.contype in ('p', 'u', 'x'))
- left outer join
- pg_am am
- on i.relam = am.oid
- WHERE
- t.relkind IN ('r', 'v', 'f', 'm', 'p')
- and t.oid = :table_oid
- and ix.indisprimary = 'f'
- ORDER BY
- t.relname,
- i.relname
- """ % (
- "ix.indnkeyatts"
- if self.server_version_info >= (11, 0)
- else "NULL",
- )
- t = sql.text(IDX_SQL).columns(
- relname=sqltypes.Unicode, attname=sqltypes.Unicode
- )
- c = connection.execute(t, dict(table_oid=table_oid))
- indexes = defaultdict(lambda: defaultdict(dict))
- sv_idx_name = None
- for row in c.fetchall():
- (
- idx_name,
- unique,
- expr,
- col,
- col_num,
- conrelid,
- idx_key,
- idx_option,
- options,
- amname,
- filter_definition,
- indnkeyatts,
- ) = row
- if expr:
- if idx_name != sv_idx_name:
- util.warn(
- "Skipped unsupported reflection of "
- "expression-based index %s" % idx_name
- )
- sv_idx_name = idx_name
- continue
- has_idx = idx_name in indexes
- index = indexes[idx_name]
- if col is not None:
- index["cols"][col_num] = col
- if not has_idx:
- idx_keys = idx_key.split()
- # "The number of key columns in the index, not counting any
- # included columns, which are merely stored and do not
- # participate in the index semantics"
- if indnkeyatts and idx_keys[indnkeyatts:]:
- # this is a "covering index" which has INCLUDE columns
- # as well as regular index columns
- inc_keys = idx_keys[indnkeyatts:]
- idx_keys = idx_keys[:indnkeyatts]
- else:
- inc_keys = []
- index["key"] = [int(k.strip()) for k in idx_keys]
- index["inc"] = [int(k.strip()) for k in inc_keys]
- # (new in pg 8.3)
- # "pg_index.indoption" is list of ints, one per column/expr.
- # int acts as bitmask: 0x01=DESC, 0x02=NULLSFIRST
- sorting = {}
- for col_idx, col_flags in enumerate(
- (idx_option or "").split()
- ):
- col_flags = int(col_flags.strip())
- col_sorting = ()
- # try to set flags only if they differ from PG defaults...
- if col_flags & 0x01:
- col_sorting += ("desc",)
- if not (col_flags & 0x02):
- col_sorting += ("nulls_last",)
- else:
- if col_flags & 0x02:
- col_sorting += ("nulls_first",)
- if col_sorting:
- sorting[col_idx] = col_sorting
- if sorting:
- index["sorting"] = sorting
- index["unique"] = unique
- if conrelid is not None:
- index["duplicates_constraint"] = idx_name
- if options:
- index["options"] = dict(
- [option.split("=") for option in options]
- )
- # it *might* be nice to include that this is 'btree' in the
- # reflection info. But we don't want an Index object
- # to have a ``postgresql_using`` in it that is just the
- # default, so for the moment leaving this out.
- if amname and amname != "btree":
- index["amname"] = amname
- if filter_definition:
- index["postgresql_where"] = filter_definition
- result = []
- for name, idx in indexes.items():
- entry = {
- "name": name,
- "unique": idx["unique"],
- "column_names": [idx["cols"][i] for i in idx["key"]],
- }
- if self.server_version_info >= (11, 0):
- # NOTE: this is legacy, this is part of dialect_options now
- # as of #7382
- entry["include_columns"] = [idx["cols"][i] for i in idx["inc"]]
- if "duplicates_constraint" in idx:
- entry["duplicates_constraint"] = idx["duplicates_constraint"]
- if "sorting" in idx:
- entry["column_sorting"] = dict(
- (idx["cols"][idx["key"][i]], value)
- for i, value in idx["sorting"].items()
- )
- if "include_columns" in entry:
- entry.setdefault("dialect_options", {})[
- "postgresql_include"
- ] = entry["include_columns"]
- if "options" in idx:
- entry.setdefault("dialect_options", {})[
- "postgresql_with"
- ] = idx["options"]
- if "amname" in idx:
- entry.setdefault("dialect_options", {})[
- "postgresql_using"
- ] = idx["amname"]
- if "postgresql_where" in idx:
- entry.setdefault("dialect_options", {})[
- "postgresql_where"
- ] = idx["postgresql_where"]
- result.append(entry)
- return result
- @reflection.cache
- def get_unique_constraints(
- self, connection, table_name, schema=None, **kw
- ):
- table_oid = self.get_table_oid(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
- UNIQUE_SQL = """
- SELECT
- cons.conname as name,
- cons.conkey as key,
- a.attnum as col_num,
- a.attname as col_name
- FROM
- pg_catalog.pg_constraint cons
- join pg_attribute a
- on cons.conrelid = a.attrelid AND
- a.attnum = ANY(cons.conkey)
- WHERE
- cons.conrelid = :table_oid AND
- cons.contype = 'u'
- """
- t = sql.text(UNIQUE_SQL).columns(col_name=sqltypes.Unicode)
- c = connection.execute(t, dict(table_oid=table_oid))
- uniques = defaultdict(lambda: defaultdict(dict))
- for row in c.fetchall():
- uc = uniques[row.name]
- uc["key"] = row.key
- uc["cols"][row.col_num] = row.col_name
- return [
- {"name": name, "column_names": [uc["cols"][i] for i in uc["key"]]}
- for name, uc in uniques.items()
- ]
- @reflection.cache
- def get_table_comment(self, connection, table_name, schema=None, **kw):
- table_oid = self.get_table_oid(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
- COMMENT_SQL = """
- SELECT
- pgd.description as table_comment
- FROM
- pg_catalog.pg_description pgd
- WHERE
- pgd.objsubid = 0 AND
- pgd.objoid = :table_oid
- """
- c = connection.execute(
- sql.text(COMMENT_SQL), dict(table_oid=table_oid)
- )
- return {"text": c.scalar()}
- @reflection.cache
- def get_check_constraints(self, connection, table_name, schema=None, **kw):
- table_oid = self.get_table_oid(
- connection, table_name, schema, info_cache=kw.get("info_cache")
- )
- CHECK_SQL = """
- SELECT
- cons.conname as name,
- pg_get_constraintdef(cons.oid) as src
- FROM
- pg_catalog.pg_constraint cons
- WHERE
- cons.conrelid = :table_oid AND
- cons.contype = 'c'
- """
- c = connection.execute(sql.text(CHECK_SQL), dict(table_oid=table_oid))
- ret = []
- for name, src in c:
- # samples:
- # "CHECK (((a > 1) AND (a < 5)))"
- # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
- # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
- # "CHECK (some_boolean_function(a))"
- # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
- m = re.match(
- r"^CHECK *\((.+)\)( NOT VALID)?$", src, flags=re.DOTALL
- )
- if not m:
- util.warn("Could not parse CHECK constraint text: %r" % src)
- sqltext = ""
- else:
- sqltext = re.compile(
- r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
- ).sub(r"\1", m.group(1))
- entry = {"name": name, "sqltext": sqltext}
- if m and m.group(2):
- entry["dialect_options"] = {"not_valid": True}
- ret.append(entry)
- return ret
- def _load_enums(self, connection, schema=None):
- schema = schema or self.default_schema_name
- if not self.supports_native_enum:
- return {}
- # Load data types for enums:
- SQL_ENUMS = """
- SELECT t.typname as "name",
- -- no enum defaults in 8.4 at least
- -- t.typdefault as "default",
- pg_catalog.pg_type_is_visible(t.oid) as "visible",
- n.nspname as "schema",
- e.enumlabel as "label"
- FROM pg_catalog.pg_type t
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
- LEFT JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
- WHERE t.typtype = 'e'
- """
- if schema != "*":
- SQL_ENUMS += "AND n.nspname = :schema "
- # e.oid gives us label order within an enum
- SQL_ENUMS += 'ORDER BY "schema", "name", e.oid'
- s = sql.text(SQL_ENUMS).columns(
- attname=sqltypes.Unicode, label=sqltypes.Unicode
- )
- if schema != "*":
- s = s.bindparams(schema=schema)
- c = connection.execute(s)
- enums = []
- enum_by_name = {}
- for enum in c.fetchall():
- key = (enum.schema, enum.name)
- if key in enum_by_name:
- enum_by_name[key]["labels"].append(enum.label)
- else:
- enum_by_name[key] = enum_rec = {
- "name": enum.name,
- "schema": enum.schema,
- "visible": enum.visible,
- "labels": [],
- }
- if enum.label is not None:
- enum_rec["labels"].append(enum.label)
- enums.append(enum_rec)
- return enums
- def _load_domains(self, connection):
- # Load data types for domains:
- SQL_DOMAINS = """
- SELECT t.typname as "name",
- pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
- not t.typnotnull as "nullable",
- t.typdefault as "default",
- pg_catalog.pg_type_is_visible(t.oid) as "visible",
- n.nspname as "schema"
- FROM pg_catalog.pg_type t
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
- WHERE t.typtype = 'd'
- """
- s = sql.text(SQL_DOMAINS)
- c = connection.execution_options(future_result=True).execute(s)
- domains = {}
- for domain in c.mappings():
- domain = domain
- # strip (30) from character varying(30)
- attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
- # 'visible' just means whether or not the domain is in a
- # schema that's on the search path -- or not overridden by
- # a schema with higher precedence. If it's not visible,
- # it will be prefixed with the schema-name when it's used.
- if domain["visible"]:
- key = (domain["name"],)
- else:
- key = (domain["schema"], domain["name"])
- domains[key] = {
- "attype": attype,
- "nullable": domain["nullable"],
- "default": domain["default"],
- }
- return domains
|