expressions.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import sqlalchemy as sa
  2. from sqlalchemy.dialects import postgresql
  3. from sqlalchemy.ext.compiler import compiles
  4. from sqlalchemy.sql.expression import ColumnElement, FunctionElement
  5. from sqlalchemy.sql.functions import GenericFunction
  6. from .functions.orm import quote
  7. class array_get(FunctionElement):
  8. name = 'array_get'
  9. @compiles(array_get)
  10. def compile_array_get(element, compiler, **kw):
  11. args = list(element.clauses)
  12. if len(args) != 2:
  13. raise Exception(
  14. "Function 'array_get' expects two arguments (%d given)." %
  15. len(args)
  16. )
  17. if not hasattr(args[1], 'value') or not isinstance(args[1].value, int):
  18. raise Exception(
  19. "Second argument should be an integer."
  20. )
  21. return '(%s)[%s]' % (
  22. compiler.process(args[0]),
  23. sa.text(str(args[1].value + 1))
  24. )
  25. class row_to_json(GenericFunction):
  26. name = 'row_to_json'
  27. type = postgresql.JSON
  28. @compiles(row_to_json, 'postgresql')
  29. def compile_row_to_json(element, compiler, **kw):
  30. return "%s(%s)" % (element.name, compiler.process(element.clauses))
  31. class json_array_length(GenericFunction):
  32. name = 'json_array_length'
  33. type = sa.Integer
  34. @compiles(json_array_length, 'postgresql')
  35. def compile_json_array_length(element, compiler, **kw):
  36. return "%s(%s)" % (element.name, compiler.process(element.clauses))
  37. class Asterisk(ColumnElement):
  38. def __init__(self, selectable):
  39. self.selectable = selectable
  40. @compiles(Asterisk)
  41. def compile_asterisk(element, compiler, **kw):
  42. return '%s.*' % quote(compiler.dialect, element.selectable.name)