__init__.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import sqlalchemy as sa
  2. from sqlalchemy.sql.util import ClauseAdapter
  3. from .chained_join import chained_join # noqa
  4. def path_to_relationships(path, cls):
  5. relationships = []
  6. for path_name in path.split('.'):
  7. rel = getattr(cls, path_name)
  8. relationships.append(rel)
  9. cls = rel.mapper.class_
  10. return relationships
  11. def adapt_expr(expr, *selectables):
  12. for selectable in selectables:
  13. expr = ClauseAdapter(selectable).traverse(expr)
  14. return expr
  15. def inverse_join(selectable, left_alias, right_alias, relationship):
  16. if relationship.property.secondary is not None:
  17. secondary_alias = sa.alias(relationship.property.secondary)
  18. return selectable.join(
  19. secondary_alias,
  20. adapt_expr(
  21. relationship.property.secondaryjoin,
  22. sa.inspect(left_alias).selectable,
  23. secondary_alias
  24. )
  25. ).join(
  26. right_alias,
  27. adapt_expr(
  28. relationship.property.primaryjoin,
  29. sa.inspect(right_alias).selectable,
  30. secondary_alias
  31. )
  32. )
  33. else:
  34. join = sa.orm.join(right_alias, left_alias, relationship)
  35. onclause = join.onclause
  36. return selectable.join(right_alias, onclause)
  37. def relationship_to_correlation(relationship, alias):
  38. if relationship.property.secondary is not None:
  39. return adapt_expr(
  40. relationship.property.primaryjoin,
  41. alias,
  42. )
  43. else:
  44. return sa.orm.join(
  45. relationship.parent,
  46. alias,
  47. relationship
  48. ).onclause
  49. def chained_inverse_join(relationships, leaf_model):
  50. selectable = sa.inspect(leaf_model).selectable
  51. aliases = [leaf_model]
  52. for index, relationship in enumerate(relationships[1:]):
  53. aliases.append(sa.orm.aliased(relationship.mapper.class_))
  54. selectable = inverse_join(
  55. selectable,
  56. aliases[index],
  57. aliases[index + 1],
  58. relationships[index]
  59. )
  60. if relationships[-1].property.secondary is not None:
  61. secondary_alias = sa.alias(relationships[-1].property.secondary)
  62. selectable = selectable.join(
  63. secondary_alias,
  64. adapt_expr(
  65. relationships[-1].property.secondaryjoin,
  66. secondary_alias,
  67. sa.inspect(aliases[-1]).selectable
  68. )
  69. )
  70. aliases.append(secondary_alias)
  71. return selectable, aliases
  72. def select_correlated_expression(
  73. root_model,
  74. expr,
  75. path,
  76. leaf_model,
  77. from_obj=None,
  78. order_by=None,
  79. correlate=True
  80. ):
  81. relationships = list(reversed(path_to_relationships(path, root_model)))
  82. query = sa.select([expr])
  83. join_expr, aliases = chained_inverse_join(relationships, leaf_model)
  84. if order_by:
  85. query = query.order_by(
  86. *[
  87. adapt_expr(
  88. o,
  89. *(sa.inspect(alias).selectable for alias in aliases)
  90. )
  91. for o in order_by
  92. ]
  93. )
  94. condition = relationship_to_correlation(
  95. relationships[-1],
  96. aliases[-1]
  97. )
  98. if from_obj is not None:
  99. condition = adapt_expr(condition, from_obj)
  100. query = query.select_from(join_expr.selectable)
  101. if correlate:
  102. query = query.correlate(
  103. from_obj if from_obj is not None else root_model
  104. )
  105. return query.where(condition)