view.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import sqlalchemy as sa
  2. from sqlalchemy.ext import compiler
  3. from sqlalchemy.schema import DDLElement, PrimaryKeyConstraint
  4. from sqlalchemy_utils.functions import get_columns
  5. class CreateView(DDLElement):
  6. def __init__(self, name, selectable, materialized=False):
  7. self.name = name
  8. self.selectable = selectable
  9. self.materialized = materialized
  10. @compiler.compiles(CreateView)
  11. def compile_create_materialized_view(element, compiler, **kw):
  12. return 'CREATE {}VIEW {} AS {}'.format(
  13. 'MATERIALIZED ' if element.materialized else '',
  14. compiler.dialect.identifier_preparer.quote(element.name),
  15. compiler.sql_compiler.process(element.selectable, literal_binds=True),
  16. )
  17. class DropView(DDLElement):
  18. def __init__(self, name, materialized=False, cascade=True):
  19. self.name = name
  20. self.materialized = materialized
  21. self.cascade = cascade
  22. @compiler.compiles(DropView)
  23. def compile_drop_materialized_view(element, compiler, **kw):
  24. return 'DROP {}VIEW IF EXISTS {} {}'.format(
  25. 'MATERIALIZED ' if element.materialized else '',
  26. compiler.dialect.identifier_preparer.quote(element.name),
  27. 'CASCADE' if element.cascade else ''
  28. )
  29. def create_table_from_selectable(
  30. name,
  31. selectable,
  32. indexes=None,
  33. metadata=None,
  34. aliases=None,
  35. **kwargs
  36. ):
  37. if indexes is None:
  38. indexes = []
  39. if metadata is None:
  40. metadata = sa.MetaData()
  41. if aliases is None:
  42. aliases = {}
  43. args = [
  44. sa.Column(
  45. c.name,
  46. c.type,
  47. key=aliases.get(c.name, c.name),
  48. primary_key=c.primary_key
  49. )
  50. for c in get_columns(selectable)
  51. ] + indexes
  52. table = sa.Table(name, metadata, *args, **kwargs)
  53. if not any([c.primary_key for c in get_columns(selectable)]):
  54. table.append_constraint(
  55. PrimaryKeyConstraint(*[c.name for c in get_columns(selectable)])
  56. )
  57. return table
  58. def create_materialized_view(
  59. name,
  60. selectable,
  61. metadata,
  62. indexes=None,
  63. aliases=None
  64. ):
  65. """ Create a view on a given metadata
  66. :param name: The name of the view to create.
  67. :param selectable: An SQLAlchemy selectable e.g. a select() statement.
  68. :param metadata:
  69. An SQLAlchemy Metadata instance that stores the features of the
  70. database being described.
  71. :param indexes: An optional list of SQLAlchemy Index instances.
  72. :param aliases:
  73. An optional dictionary containing with keys as column names and values
  74. as column aliases.
  75. Same as for ``create_view`` except that a ``CREATE MATERIALIZED VIEW``
  76. statement is emitted instead of a ``CREATE VIEW``.
  77. """
  78. table = create_table_from_selectable(
  79. name=name,
  80. selectable=selectable,
  81. indexes=indexes,
  82. metadata=None,
  83. aliases=aliases
  84. )
  85. sa.event.listen(
  86. metadata,
  87. 'after_create',
  88. CreateView(name, selectable, materialized=True)
  89. )
  90. @sa.event.listens_for(metadata, 'after_create')
  91. def create_indexes(target, connection, **kw):
  92. for idx in table.indexes:
  93. idx.create(connection)
  94. sa.event.listen(
  95. metadata,
  96. 'before_drop',
  97. DropView(name, materialized=True)
  98. )
  99. return table
  100. def create_view(
  101. name,
  102. selectable,
  103. metadata,
  104. cascade_on_drop=True
  105. ):
  106. """ Create a view on a given metadata
  107. :param name: The name of the view to create.
  108. :param selectable: An SQLAlchemy selectable e.g. a select() statement.
  109. :param metadata:
  110. An SQLAlchemy Metadata instance that stores the features of the
  111. database being described.
  112. The process for creating a view is similar to the standard way that a
  113. table is constructed, except that a selectable is provided instead of
  114. a set of columns. The view is created once a ``CREATE`` statement is
  115. executed against the supplied metadata (e.g. ``metadata.create_all(..)``),
  116. and dropped when a ``DROP`` is executed against the metadata.
  117. To create a view that performs basic filtering on a table. ::
  118. metadata = MetaData()
  119. users = Table('users', metadata,
  120. Column('id', Integer, primary_key=True),
  121. Column('name', String),
  122. Column('fullname', String),
  123. Column('premium_user', Boolean, default=False),
  124. )
  125. premium_members = select([users]).where(users.c.premium_user == True)
  126. create_view('premium_users', premium_members, metadata)
  127. metadata.create_all(engine) # View is created at this point
  128. """
  129. table = create_table_from_selectable(
  130. name=name,
  131. selectable=selectable,
  132. metadata=None
  133. )
  134. sa.event.listen(metadata, 'after_create', CreateView(name, selectable))
  135. @sa.event.listens_for(metadata, 'after_create')
  136. def create_indexes(target, connection, **kw):
  137. for idx in table.indexes:
  138. idx.create(connection)
  139. sa.event.listen(
  140. metadata,
  141. 'before_drop',
  142. DropView(name, cascade=cascade_on_drop)
  143. )
  144. return table
  145. def refresh_materialized_view(session, name, concurrently=False):
  146. """ Refreshes an already existing materialized view
  147. :param session: An SQLAlchemy Session instance.
  148. :param name: The name of the materialized view to refresh.
  149. :param concurrently:
  150. Optional flag that causes the ``CONCURRENTLY`` parameter
  151. to be specified when the materialized view is refreshed.
  152. """
  153. # Since session.execute() bypasses autoflush, we must manually flush in
  154. # order to include newly-created/modified objects in the refresh.
  155. session.flush()
  156. session.execute(
  157. 'REFRESH MATERIALIZED VIEW {}{}'.format(
  158. 'CONCURRENTLY ' if concurrently else '',
  159. session.bind.engine.dialect.identifier_preparer.quote(name)
  160. )
  161. )