123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import sqlalchemy as sa
- from sqlalchemy.sql.util import ClauseAdapter
- from .chained_join import chained_join # noqa
- def path_to_relationships(path, cls):
- relationships = []
- for path_name in path.split('.'):
- rel = getattr(cls, path_name)
- relationships.append(rel)
- cls = rel.mapper.class_
- return relationships
- def adapt_expr(expr, *selectables):
- for selectable in selectables:
- expr = ClauseAdapter(selectable).traverse(expr)
- return expr
- def inverse_join(selectable, left_alias, right_alias, relationship):
- if relationship.property.secondary is not None:
- secondary_alias = sa.alias(relationship.property.secondary)
- return selectable.join(
- secondary_alias,
- adapt_expr(
- relationship.property.secondaryjoin,
- sa.inspect(left_alias).selectable,
- secondary_alias
- )
- ).join(
- right_alias,
- adapt_expr(
- relationship.property.primaryjoin,
- sa.inspect(right_alias).selectable,
- secondary_alias
- )
- )
- else:
- join = sa.orm.join(right_alias, left_alias, relationship)
- onclause = join.onclause
- return selectable.join(right_alias, onclause)
- def relationship_to_correlation(relationship, alias):
- if relationship.property.secondary is not None:
- return adapt_expr(
- relationship.property.primaryjoin,
- alias,
- )
- else:
- return sa.orm.join(
- relationship.parent,
- alias,
- relationship
- ).onclause
- def chained_inverse_join(relationships, leaf_model):
- selectable = sa.inspect(leaf_model).selectable
- aliases = [leaf_model]
- for index, relationship in enumerate(relationships[1:]):
- aliases.append(sa.orm.aliased(relationship.mapper.class_))
- selectable = inverse_join(
- selectable,
- aliases[index],
- aliases[index + 1],
- relationships[index]
- )
- if relationships[-1].property.secondary is not None:
- secondary_alias = sa.alias(relationships[-1].property.secondary)
- selectable = selectable.join(
- secondary_alias,
- adapt_expr(
- relationships[-1].property.secondaryjoin,
- secondary_alias,
- sa.inspect(aliases[-1]).selectable
- )
- )
- aliases.append(secondary_alias)
- return selectable, aliases
- def select_correlated_expression(
- root_model,
- expr,
- path,
- leaf_model,
- from_obj=None,
- order_by=None,
- correlate=True
- ):
- relationships = list(reversed(path_to_relationships(path, root_model)))
- query = sa.select([expr])
- join_expr, aliases = chained_inverse_join(relationships, leaf_model)
- if order_by:
- query = query.order_by(
- *[
- adapt_expr(
- o,
- *(sa.inspect(alias).selectable for alias in aliases)
- )
- for o in order_by
- ]
- )
- condition = relationship_to_correlation(
- relationships[-1],
- aliases[-1]
- )
- if from_obj is not None:
- condition = adapt_expr(condition, from_obj)
- query = query.select_from(join_expr.selectable)
- if correlate:
- query = query.correlate(
- from_obj if from_obj is not None else root_model
- )
- return query.where(condition)
|