관리-도구
편집 파일: strategies.py
# orm/strategies.py # Copyright (C) 2005-2021 the SQLAlchemy authors and contributors # <see AUTHORS file> # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php """sqlalchemy.orm.interfaces.LoaderStrategy implementations, and related MapperOptions.""" from __future__ import absolute_import import collections import itertools from . import attributes from . import exc as orm_exc from . import interfaces from . import loading from . import properties from . import query from . import unitofwork from . import util as orm_util from .base import _DEFER_FOR_STATE from .base import _SET_DEFERRED_EXPIRED from .interfaces import LoaderStrategy from .interfaces import StrategizedProperty from .session import _state_session from .state import InstanceState from .util import _none_set from .util import aliased from .. import event from .. import exc as sa_exc from .. import inspect from .. import log from .. import sql from .. import util from ..sql import util as sql_util from ..sql import visitors def _register_attribute( prop, mapper, useobject, compare_function=None, typecallable=None, callable_=None, proxy_property=None, active_history=False, impl_class=None, **kw ): attribute_ext = list(util.to_list(prop.extension, default=[])) listen_hooks = [] uselist = useobject and prop.uselist if useobject and prop.single_parent: listen_hooks.append(single_parent_validator) if prop.key in prop.parent.validators: fn, opts = prop.parent.validators[prop.key] listen_hooks.append( lambda desc, prop: orm_util._validator_events( desc, prop.key, fn, **opts ) ) if useobject: listen_hooks.append(unitofwork.track_cascade_events) # need to assemble backref listeners # after the singleparentvalidator, mapper validator if useobject: backref = prop.back_populates if backref and prop._effective_sync_backref: listen_hooks.append( lambda desc, prop: attributes.backref_listeners( desc, backref, uselist ) ) # a single MapperProperty is shared down a class inheritance # hierarchy, so we set up attribute instrumentation and backref event # for each mapper down the hierarchy. # typically, "mapper" is the same as prop.parent, due to the way # the configure_mappers() process runs, however this is not strongly # enforced, and in the case of a second configure_mappers() run the # mapper here might not be prop.parent; also, a subclass mapper may # be called here before a superclass mapper. That is, can't depend # on mappers not already being set up so we have to check each one. for m in mapper.self_and_descendants: if prop is m._props.get( prop.key ) and not m.class_manager._attr_has_impl(prop.key): desc = attributes.register_attribute_impl( m.class_, prop.key, parent_token=prop, uselist=uselist, compare_function=compare_function, useobject=useobject, extension=attribute_ext, trackparent=useobject and ( prop.single_parent or prop.direction is interfaces.ONETOMANY ), typecallable=typecallable, callable_=callable_, active_history=active_history, impl_class=impl_class, send_modified_events=not useobject or not prop.viewonly, doc=prop.doc, **kw ) for hook in listen_hooks: hook(desc, prop) @properties.ColumnProperty.strategy_for(instrument=False, deferred=False) class UninstrumentedColumnLoader(LoaderStrategy): """Represent a non-instrumented MapperProperty. The polymorphic_on argument of mapper() often results in this, if the argument is against the with_polymorphic selectable. """ __slots__ = ("columns",) def __init__(self, parent, strategy_key): super(UninstrumentedColumnLoader, self).__init__(parent, strategy_key) self.columns = self.parent_property.columns def setup_query( self, context, query_entity, path, loadopt, adapter, column_collection=None, **kwargs ): for c in self.columns: if adapter: c = adapter.columns[c] column_collection.append(c) def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): pass @log.class_logger @properties.ColumnProperty.strategy_for(instrument=True, deferred=False) class ColumnLoader(LoaderStrategy): """Provide loading behavior for a :class:`.ColumnProperty`.""" __slots__ = "columns", "is_composite" def __init__(self, parent, strategy_key): super(ColumnLoader, self).__init__(parent, strategy_key) self.columns = self.parent_property.columns self.is_composite = hasattr(self.parent_property, "composite_class") def setup_query( self, context, query_entity, path, loadopt, adapter, column_collection, memoized_populators, **kwargs ): for c in self.columns: if adapter: c = adapter.columns[c] column_collection.append(c) fetch = self.columns[0] if adapter: fetch = adapter.columns[fetch] memoized_populators[self.parent_property] = fetch def init_class_attribute(self, mapper): self.is_class_level = True coltype = self.columns[0].type # TODO: check all columns ? check for foreign key as well? active_history = ( self.parent_property.active_history or self.columns[0].primary_key or mapper.version_id_col in set(self.columns) ) _register_attribute( self.parent_property, mapper, useobject=False, compare_function=coltype.compare_values, active_history=active_history, ) def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): # look through list of columns represented here # to see which, if any, is present in the row. for col in self.columns: if adapter: col = adapter.columns[col] getter = result._getter(col, False) if getter: populators["quick"].append((self.key, getter)) break else: populators["expire"].append((self.key, True)) @log.class_logger @properties.ColumnProperty.strategy_for(query_expression=True) class ExpressionColumnLoader(ColumnLoader): def __init__(self, parent, strategy_key): super(ExpressionColumnLoader, self).__init__(parent, strategy_key) null = sql.null() self._have_default_expression = any( not c.compare(null) for c in self.parent_property.columns ) def setup_query( self, context, query_entity, path, loadopt, adapter, column_collection, memoized_populators, **kwargs ): columns = None if loadopt and "expression" in loadopt.local_opts: columns = [loadopt.local_opts["expression"]] elif self._have_default_expression: columns = self.parent_property.columns if columns is None: return for c in columns: if adapter: c = adapter.columns[c] column_collection.append(c) fetch = columns[0] if adapter: fetch = adapter.columns[fetch] memoized_populators[self.parent_property] = fetch def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): # look through list of columns represented here # to see which, if any, is present in the row. if loadopt and "expression" in loadopt.local_opts: columns = [loadopt.local_opts["expression"]] for col in columns: if adapter: col = adapter.columns[col] getter = result._getter(col, False) if getter: populators["quick"].append((self.key, getter)) break else: populators["expire"].append((self.key, True)) def init_class_attribute(self, mapper): self.is_class_level = True _register_attribute( self.parent_property, mapper, useobject=False, compare_function=self.columns[0].type.compare_values, accepts_scalar_loader=False, ) @log.class_logger @properties.ColumnProperty.strategy_for(deferred=True, instrument=True) @properties.ColumnProperty.strategy_for(do_nothing=True) class DeferredColumnLoader(LoaderStrategy): """Provide loading behavior for a deferred :class:`.ColumnProperty`.""" __slots__ = "columns", "group" def __init__(self, parent, strategy_key): super(DeferredColumnLoader, self).__init__(parent, strategy_key) if hasattr(self.parent_property, "composite_class"): raise NotImplementedError( "Deferred loading for composite " "types not implemented yet" ) self.columns = self.parent_property.columns self.group = self.parent_property.group def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): # this path currently does not check the result # for the column; this is because in most cases we are # working just with the setup_query() directive which does # not support this, and the behavior here should be consistent. if not self.is_class_level: set_deferred_for_local_state = ( self.parent_property._deferred_column_loader ) populators["new"].append((self.key, set_deferred_for_local_state)) else: populators["expire"].append((self.key, False)) def init_class_attribute(self, mapper): self.is_class_level = True _register_attribute( self.parent_property, mapper, useobject=False, compare_function=self.columns[0].type.compare_values, callable_=self._load_for_state, expire_missing=False, ) def setup_query( self, context, query_entity, path, loadopt, adapter, column_collection, memoized_populators, only_load_props=None, **kw ): if ( ( loadopt and "undefer_pks" in loadopt.local_opts and set(self.columns).intersection( self.parent._should_undefer_in_wildcard ) ) or ( loadopt and self.group and loadopt.local_opts.get( "undefer_group_%s" % self.group, False ) ) or (only_load_props and self.key in only_load_props) ): self.parent_property._get_strategy( (("deferred", False), ("instrument", True)) ).setup_query( context, query_entity, path, loadopt, adapter, column_collection, memoized_populators, **kw ) elif self.is_class_level: memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED else: memoized_populators[self.parent_property] = _DEFER_FOR_STATE def _load_for_state(self, state, passive): if not state.key: return attributes.ATTR_EMPTY if not passive & attributes.SQL_OK: return attributes.PASSIVE_NO_RESULT localparent = state.manager.mapper if self.group: toload = [ p.key for p in localparent.iterate_properties if isinstance(p, StrategizedProperty) and isinstance(p.strategy, DeferredColumnLoader) and p.group == self.group ] else: toload = [self.key] # narrow the keys down to just those which have no history group = [k for k in toload if k in state.unmodified] session = _state_session(state) if session is None: raise orm_exc.DetachedInstanceError( "Parent instance %s is not bound to a Session; " "deferred load operation of attribute '%s' cannot proceed" % (orm_util.state_str(state), self.key) ) query = session.query(localparent) if ( loading.load_on_ident( query, state.key, only_load_props=group, refresh_state=state ) is None ): raise orm_exc.ObjectDeletedError(state) return attributes.ATTR_WAS_SET class LoadDeferredColumns(object): """serializable loader object used by DeferredColumnLoader""" def __init__(self, key): self.key = key def __call__(self, state, passive=attributes.PASSIVE_OFF): key = self.key localparent = state.manager.mapper prop = localparent._props[key] strategy = prop._strategies[DeferredColumnLoader] return strategy._load_for_state(state, passive) class AbstractRelationshipLoader(LoaderStrategy): """LoaderStratgies which deal with related objects.""" __slots__ = "mapper", "target", "uselist", "entity" def __init__(self, parent, strategy_key): super(AbstractRelationshipLoader, self).__init__(parent, strategy_key) self.mapper = self.parent_property.mapper self.entity = self.parent_property.entity self.target = self.parent_property.target self.uselist = self.parent_property.uselist @log.class_logger @properties.RelationshipProperty.strategy_for(do_nothing=True) class DoNothingLoader(LoaderStrategy): """Relationship loader that makes no change to the object's state. Compared to NoLoader, this loader does not initialize the collection/attribute to empty/none; the usual default LazyLoader will take effect. """ @log.class_logger @properties.RelationshipProperty.strategy_for(lazy="noload") @properties.RelationshipProperty.strategy_for(lazy=None) class NoLoader(AbstractRelationshipLoader): """Provide loading behavior for a :class:`.RelationshipProperty` with "lazy=None". """ __slots__ = () def init_class_attribute(self, mapper): self.is_class_level = True _register_attribute( self.parent_property, mapper, useobject=True, typecallable=self.parent_property.collection_class, ) def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): def invoke_no_load(state, dict_, row): if self.uselist: state.manager.get_impl(self.key).initialize(state, dict_) else: dict_[self.key] = None populators["new"].append((self.key, invoke_no_load)) @log.class_logger @properties.RelationshipProperty.strategy_for(lazy=True) @properties.RelationshipProperty.strategy_for(lazy="select") @properties.RelationshipProperty.strategy_for(lazy="raise") @properties.RelationshipProperty.strategy_for(lazy="raise_on_sql") @properties.RelationshipProperty.strategy_for(lazy="baked_select") class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots): """Provide loading behavior for a :class:`.RelationshipProperty` with "lazy=True", that is loads when first accessed. """ __slots__ = ( "_lazywhere", "_rev_lazywhere", "use_get", "is_aliased_class", "_bind_to_col", "_equated_columns", "_rev_bind_to_col", "_rev_equated_columns", "_simple_lazy_clause", "_raise_always", "_raise_on_sql", "_bakery", ) def __init__(self, parent, strategy_key): super(LazyLoader, self).__init__(parent, strategy_key) self._raise_always = self.strategy_opts["lazy"] == "raise" self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql" self.is_aliased_class = inspect(self.entity).is_aliased_class join_condition = self.parent_property._join_condition ( self._lazywhere, self._bind_to_col, self._equated_columns, ) = join_condition.create_lazy_clause() ( self._rev_lazywhere, self._rev_bind_to_col, self._rev_equated_columns, ) = join_condition.create_lazy_clause(reverse_direction=True) self.logger.info("%s lazy loading clause %s", self, self._lazywhere) # determine if our "lazywhere" clause is the same as the mapper's # get() clause. then we can just use mapper.get() # # TODO: the "not self.uselist" can be taken out entirely; a m2o # load that populates for a list (very unusual, but is possible with # the API) can still set for "None" and the attribute system will # populate as an empty list. self.use_get = ( not self.is_aliased_class and not self.uselist and self.entity._get_clause[0].compare( self._lazywhere, use_proxies=True, equivalents=self.mapper._equivalent_columns, ) ) if self.use_get: for col in list(self._equated_columns): if col in self.mapper._equivalent_columns: for c in self.mapper._equivalent_columns[col]: self._equated_columns[c] = self._equated_columns[col] self.logger.info( "%s will use query.get() to " "optimize instance loads", self ) def init_class_attribute(self, mapper): self.is_class_level = True active_history = ( self.parent_property.active_history or self.parent_property.direction is not interfaces.MANYTOONE or not self.use_get ) # MANYTOONE currently only needs the # "old" value for delete-orphan # cascades. the required _SingleParentValidator # will enable active_history # in that case. otherwise we don't need the # "old" value during backref operations. _register_attribute( self.parent_property, mapper, useobject=True, callable_=self._load_for_state, typecallable=self.parent_property.collection_class, active_history=active_history, ) def _memoized_attr__simple_lazy_clause(self): criterion, bind_to_col = (self._lazywhere, self._bind_to_col) params = [] def visit_bindparam(bindparam): bindparam.unique = False visitors.traverse(criterion, {}, {"bindparam": visit_bindparam}) def visit_bindparam(bindparam): if bindparam._identifying_key in bind_to_col: params.append( ( bindparam.key, bind_to_col[bindparam._identifying_key], None, ) ) elif bindparam.callable is None: params.append((bindparam.key, None, bindparam.value)) criterion = visitors.cloned_traverse( criterion, {}, {"bindparam": visit_bindparam} ) return criterion, params def _generate_lazy_clause(self, state, passive): criterion, param_keys = self._simple_lazy_clause if state is None: return sql_util.adapt_criterion_to_null( criterion, [key for key, ident, value in param_keys] ) mapper = self.parent_property.parent o = state.obj() # strong ref dict_ = attributes.instance_dict(o) if passive & attributes.INIT_OK: passive ^= attributes.INIT_OK params = {} for key, ident, value in param_keys: if ident is not None: if passive and passive & attributes.LOAD_AGAINST_COMMITTED: value = mapper._get_committed_state_attr_by_column( state, dict_, ident, passive ) else: value = mapper._get_state_attr_by_column( state, dict_, ident, passive ) params[key] = value return criterion, params def _invoke_raise_load(self, state, passive, lazy): raise sa_exc.InvalidRequestError( "'%s' is not available due to lazy='%s'" % (self, lazy) ) def _load_for_state(self, state, passive): if not state.key and ( ( not self.parent_property.load_on_pending and not state._load_pending ) or not state.session_id ): return attributes.ATTR_EMPTY pending = not state.key primary_key_identity = None if (not passive & attributes.SQL_OK and not self.use_get) or ( not passive & attributes.NON_PERSISTENT_OK and pending ): return attributes.PASSIVE_NO_RESULT if ( # we were given lazy="raise" self._raise_always # the no_raise history-related flag was not passed and not passive & attributes.NO_RAISE and ( # if we are use_get and related_object_ok is disabled, # which means we are at most looking in the identity map # for history purposes or otherwise returning # PASSIVE_NO_RESULT, don't raise. This is also a # history-related flag not self.use_get or passive & attributes.RELATED_OBJECT_OK ) ): self._invoke_raise_load(state, passive, "raise") session = _state_session(state) if not session: if passive & attributes.NO_RAISE: return attributes.PASSIVE_NO_RESULT raise orm_exc.DetachedInstanceError( "Parent instance %s is not bound to a Session; " "lazy load operation of attribute '%s' cannot proceed" % (orm_util.state_str(state), self.key) ) # if we have a simple primary key load, check the # identity map without generating a Query at all if self.use_get: primary_key_identity = self._get_ident_for_use_get( session, state, passive ) if attributes.PASSIVE_NO_RESULT in primary_key_identity: return attributes.PASSIVE_NO_RESULT elif attributes.NEVER_SET in primary_key_identity: return attributes.NEVER_SET if _none_set.issuperset(primary_key_identity): return None # look for this identity in the identity map. Delegate to the # Query class in use, as it may have special rules for how it # does this, including how it decides what the correct # identity_token would be for this identity. instance = session.query()._identity_lookup( self.entity, primary_key_identity, passive=passive, lazy_loaded_from=state, ) if instance is not None: if instance is attributes.PASSIVE_CLASS_MISMATCH: return None else: return instance elif ( not passive & attributes.SQL_OK or not passive & attributes.RELATED_OBJECT_OK ): return attributes.PASSIVE_NO_RESULT return self._emit_lazyload( session, state, primary_key_identity, passive ) def _get_ident_for_use_get(self, session, state, passive): instance_mapper = state.manager.mapper if passive & attributes.LOAD_AGAINST_COMMITTED: get_attr = instance_mapper._get_committed_state_attr_by_column else: get_attr = instance_mapper._get_state_attr_by_column dict_ = state.dict return [ get_attr(state, dict_, self._equated_columns[pk], passive=passive) for pk in self.mapper.primary_key ] @util.dependencies("sqlalchemy.ext.baked") def _memoized_attr__bakery(self, baked): return baked.bakery(size=50) @util.dependencies("sqlalchemy.orm.strategy_options") def _emit_lazyload( self, strategy_options, session, state, primary_key_identity, passive ): # emit lazy load now using BakedQuery, to cut way down on the overhead # of generating queries. # there are two big things we are trying to guard against here: # # 1. two different lazy loads that need to have a different result, # being cached on the same key. The results between two lazy loads # can be different due to the options passed to the query, which # take effect for descendant objects. Therefore we have to make # sure paths and load options generate good cache keys, and if they # don't, we don't cache. # 2. a lazy load that gets cached on a key that includes some # "throwaway" object, like a per-query AliasedClass, meaning # the cache key will never be seen again and the cache itself # will fill up. (the cache is an LRU cache, so while we won't # run out of memory, it will perform terribly when it's full. A # warning is emitted if this occurs.) We must prevent the # generation of a cache key that is including a throwaway object # in the key. # note that "lazy='select'" and "lazy=True" make two separate # lazy loaders. Currently the LRU cache is local to the LazyLoader, # however add ourselves to the initial cache key just to future # proof in case it moves q = self._bakery(lambda session: session.query(self.entity), self) q.add_criteria( lambda q: q._adapt_all_clauses()._with_invoke_all_eagers(False), self.parent_property, ) if not self.parent_property.bake_queries: q.spoil(full=True) if self.parent_property.secondary is not None: q.add_criteria( lambda q: q.select_from( self.mapper, self.parent_property.secondary ) ) pending = not state.key # don't autoflush on pending if pending or passive & attributes.NO_AUTOFLUSH: q.add_criteria(lambda q: q.autoflush(False)) if state.load_options: # here, if any of the options cannot return a cache key, # the BakedQuery "spoils" and caching will not occur. a path # that features Cls.attribute.of_type(some_alias) will cancel # caching, for example, since "some_alias" is user-defined and # is usually a throwaway object. effective_path = state.load_path[self.parent_property] q._add_lazyload_options(state.load_options, effective_path) if self.use_get: if self._raise_on_sql: self._invoke_raise_load(state, passive, "raise_on_sql") return ( q(session) .with_post_criteria(lambda q: q._set_lazyload_from(state)) ._load_on_pk_identity( session.query(self.mapper), primary_key_identity ) ) if self.parent_property.order_by: q.add_criteria( lambda q: q.order_by( *util.to_list(self.parent_property.order_by) ) ) for rev in self.parent_property._reverse_property: # reverse props that are MANYTOONE are loading *this* # object from get(), so don't need to eager out to those. if ( rev.direction is interfaces.MANYTOONE and rev._use_get and not isinstance(rev.strategy, LazyLoader) ): q.add_criteria( lambda q: q.options( strategy_options.Load.for_existing_path( q._current_path[rev.parent] ).lazyload(rev.key) ) ) lazy_clause, params = self._generate_lazy_clause(state, passive) if pending: if util.has_intersection(orm_util._none_set, params.values()): return None elif util.has_intersection(orm_util._never_set, params.values()): return None if self._raise_on_sql: self._invoke_raise_load(state, passive, "raise_on_sql") q.add_criteria(lambda q: q.filter(lazy_clause)) # set parameters in the query such that we don't overwrite # parameters that are already set within it def set_default_params(q): params.update(q._params) q._params = params return q result = ( q(session) .with_post_criteria(lambda q: q._set_lazyload_from(state)) .with_post_criteria(set_default_params) .all() ) if self.uselist: return result else: l = len(result) if l: if l > 1: util.warn( "Multiple rows returned with " "uselist=False for lazily-loaded attribute '%s' " % self.parent_property ) return result[0] else: return None def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): key = self.key if not self.is_class_level: # we are not the primary manager for this attribute # on this class - set up a # per-instance lazyloader, which will override the # class-level behavior. # this currently only happens when using a # "lazyload" option on a "no load" # attribute - "eager" attributes always have a # class-level lazyloader installed. set_lazy_callable = ( InstanceState._instance_level_callable_processor )(mapper.class_manager, LoadLazyAttribute(key, self), key) populators["new"].append((self.key, set_lazy_callable)) elif context.populate_existing or mapper.always_refresh: def reset_for_lazy_callable(state, dict_, row): # we are the primary manager for this attribute on # this class - reset its # per-instance attribute state, so that the class-level # lazy loader is # executed when next referenced on this instance. # this is needed in # populate_existing() types of scenarios to reset # any existing state. state._reset(dict_, key) populators["new"].append((self.key, reset_for_lazy_callable)) class LoadLazyAttribute(object): """serializable loader object used by LazyLoader""" def __init__(self, key, initiating_strategy): self.key = key self.strategy_key = initiating_strategy.strategy_key def __call__(self, state, passive=attributes.PASSIVE_OFF): key = self.key instance_mapper = state.manager.mapper prop = instance_mapper._props[key] strategy = prop._strategies[self.strategy_key] return strategy._load_for_state(state, passive) @properties.RelationshipProperty.strategy_for(lazy="immediate") class ImmediateLoader(AbstractRelationshipLoader): __slots__ = () def init_class_attribute(self, mapper): self.parent_property._get_strategy( (("lazy", "select"),) ).init_class_attribute(mapper) def setup_query( self, context, entity, path, loadopt, adapter, column_collection=None, parentmapper=None, **kwargs ): pass def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): def load_immediate(state, dict_, row): state.get_impl(self.key).get(state, dict_) populators["delayed"].append((self.key, load_immediate)) @log.class_logger @properties.RelationshipProperty.strategy_for(lazy="subquery") class SubqueryLoader(AbstractRelationshipLoader): __slots__ = ("join_depth",) def __init__(self, parent, strategy_key): super(SubqueryLoader, self).__init__(parent, strategy_key) self.join_depth = self.parent_property.join_depth def init_class_attribute(self, mapper): self.parent_property._get_strategy( (("lazy", "select"),) ).init_class_attribute(mapper) def setup_query( self, context, entity, path, loadopt, adapter, column_collection=None, parentmapper=None, **kwargs ): if not context.query._enable_eagerloads: return elif context.query._yield_per: context.query._no_yield_per("subquery") path = path[self.parent_property] # build up a path indicating the path from the leftmost # entity to the thing we're subquery loading. with_poly_entity = path.get( context.attributes, "path_with_polymorphic", None ) if with_poly_entity is not None: effective_entity = with_poly_entity else: effective_entity = self.entity subq_path = context.attributes.get( ("subquery_path", None), orm_util.PathRegistry.root ) subq_path = subq_path + path # if not via query option, check for # a cycle if not path.contains(context.attributes, "loader"): if self.join_depth: if ( ( context.query._current_path.length if context.query._current_path else 0 ) + path.length ) / 2 > self.join_depth: return elif subq_path.contains_mapper(self.mapper): return ( leftmost_mapper, leftmost_attr, leftmost_relationship, ) = self._get_leftmost(subq_path) orig_query = context.attributes.get( ("orig_query", SubqueryLoader), context.query ) # generate a new Query from the original, then # produce a subquery from it. left_alias = self._generate_from_original_query( orig_query, leftmost_mapper, leftmost_attr, leftmost_relationship, entity.entity_zero, ) # generate another Query that will join the # left alias to the target relationships. # basically doing a longhand # "from_self()". (from_self() itself not quite industrial # strength enough for all contingencies...but very close) q = orig_query.session.query(effective_entity) q._attributes = { ("orig_query", SubqueryLoader): orig_query, ("subquery_path", None): subq_path, } q = q._set_enable_single_crit(False) to_join, local_attr, parent_alias = self._prep_for_joins( left_alias, subq_path ) q = q.add_columns(*local_attr) q = self._apply_joins( q, to_join, left_alias, parent_alias, effective_entity ) q = self._setup_options(q, subq_path, orig_query, effective_entity) q = self._setup_outermost_orderby(q) # add new query to attributes to be picked up # by create_row_processor path.set(context.attributes, "subquery", q) def _get_leftmost(self, subq_path): subq_path = subq_path.path subq_mapper = orm_util._class_to_mapper(subq_path[0]) # determine attributes of the leftmost mapper if ( self.parent.isa(subq_mapper) and self.parent_property is subq_path[1] ): leftmost_mapper, leftmost_prop = self.parent, self.parent_property else: leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1] leftmost_cols = leftmost_prop.local_columns leftmost_attr = [ getattr( subq_path[0].entity, leftmost_mapper._columntoproperty[c].key ) for c in leftmost_cols ] return leftmost_mapper, leftmost_attr, leftmost_prop def _generate_from_original_query( self, orig_query, leftmost_mapper, leftmost_attr, leftmost_relationship, orig_entity, ): # reformat the original query # to look only for significant columns q = orig_query._clone().correlate(None) # set the query's "FROM" list explicitly to what the # FROM list would be in any case, as we will be limiting # the columns in the SELECT list which may no longer include # all entities mentioned in things like WHERE, JOIN, etc. if not q._from_obj: q._set_select_from( list( set( [ ent["entity"] for ent in orig_query.column_descriptions if ent["entity"] is not None ] ) ), False, ) # select from the identity columns of the outer (specifically, these # are the 'local_cols' of the property). This will remove # other columns from the query that might suggest the right entity # which is why we do _set_select_from above. target_cols = q._adapt_col_list(leftmost_attr) q._set_entities(target_cols) distinct_target_key = leftmost_relationship.distinct_target_key if distinct_target_key is True: q._distinct = True elif distinct_target_key is None: # if target_cols refer to a non-primary key or only # part of a composite primary key, set the q as distinct for t in set(c.table for c in target_cols): if not set(target_cols).issuperset(t.primary_key): q._distinct = True break if q._order_by is False: q._order_by = leftmost_mapper.order_by # don't need ORDER BY if no limit/offset if q._limit is None and q._offset is None: q._order_by = None # the original query now becomes a subquery # which we'll join onto. embed_q = q.with_labels().subquery() left_alias = orm_util.AliasedClass( leftmost_mapper, embed_q, use_mapper_path=True ) return left_alias def _prep_for_joins(self, left_alias, subq_path): # figure out what's being joined. a.k.a. the fun part to_join = [] pairs = list(subq_path.pairs()) for i, (mapper, prop) in enumerate(pairs): if i > 0: # look at the previous mapper in the chain - # if it is as or more specific than this prop's # mapper, use that instead. # note we have an assumption here that # the non-first element is always going to be a mapper, # not an AliasedClass prev_mapper = pairs[i - 1][1].mapper to_append = prev_mapper if prev_mapper.isa(mapper) else mapper else: to_append = mapper to_join.append((to_append, prop.key)) # determine the immediate parent class we are joining from, # which needs to be aliased. if len(to_join) < 2: # in the case of a one level eager load, this is the # leftmost "left_alias". parent_alias = left_alias else: info = inspect(to_join[-1][0]) if info.is_aliased_class: parent_alias = info.entity else: # alias a plain mapper as we may be # joining multiple times parent_alias = orm_util.AliasedClass( info.entity, use_mapper_path=True ) local_cols = self.parent_property.local_columns local_attr = [ getattr(parent_alias, self.parent._columntoproperty[c].key) for c in local_cols ] return to_join, local_attr, parent_alias def _apply_joins( self, q, to_join, left_alias, parent_alias, effective_entity ): ltj = len(to_join) if ltj == 1: to_join = [ getattr(left_alias, to_join[0][1]).of_type(effective_entity) ] elif ltj == 2: to_join = [ getattr(left_alias, to_join[0][1]).of_type(parent_alias), getattr(parent_alias, to_join[-1][1]).of_type( effective_entity ), ] elif ltj > 2: middle = [ ( orm_util.AliasedClass(item[0]) if not inspect(item[0]).is_aliased_class else item[0].entity, item[1], ) for item in to_join[1:-1] ] inner = [] while middle: item = middle.pop(0) attr = getattr(item[0], item[1]) if middle: attr = attr.of_type(middle[0][0]) else: attr = attr.of_type(parent_alias) inner.append(attr) to_join = ( [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)] + inner + [ getattr(parent_alias, to_join[-1][1]).of_type( effective_entity ) ] ) for attr in to_join: q = q.join(attr, from_joinpoint=True) return q def _setup_options(self, q, subq_path, orig_query, effective_entity): # propagate loader options etc. to the new query. # these will fire relative to subq_path. q = q._with_current_path(subq_path) q = q._conditional_options(*orig_query._with_options) if orig_query._populate_existing: q._populate_existing = orig_query._populate_existing return q def _setup_outermost_orderby(self, q): if self.parent_property.order_by: # if there's an ORDER BY, alias it the same # way joinedloader does, but we have to pull out # the "eagerjoin" from the query. # this really only picks up the "secondary" table # right now. eagerjoin = q._from_obj[0] eager_order_by = eagerjoin._target_adapter.copy_and_process( util.to_list(self.parent_property.order_by) ) q = q.order_by(*eager_order_by) return q class _SubqCollections(object): """Given a :class:`_query.Query` used to emit the "subquery load", provide a load interface that executes the query at the first moment a value is needed. """ _data = None def __init__(self, subq): self.subq = subq def get(self, key, default): if self._data is None: self._load() return self._data.get(key, default) def _load(self): self._data = collections.defaultdict(list) for k, v in itertools.groupby(self.subq, lambda x: x[1:]): self._data[k].extend(vv[0] for vv in v) def loader(self, state, dict_, row): if self._data is None: self._load() def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): if not self.parent.class_manager[self.key].impl.supports_population: raise sa_exc.InvalidRequestError( "'%s' does not support object " "population - eager loading cannot be applied." % self ) path = path[self.parent_property] subq = path.get(context.attributes, "subquery") if subq is None: return assert subq.session is context.session, ( "Subquery session doesn't refer to that of " "our context. Are there broken context caching " "schemes being used?" ) local_cols = self.parent_property.local_columns # cache the loaded collections in the context # so that inheriting mappers don't re-load when they # call upon create_row_processor again collections = path.get(context.attributes, "collections") if collections is None: collections = self._SubqCollections(subq) path.set(context.attributes, "collections", collections) if adapter: local_cols = [adapter.columns[c] for c in local_cols] if self.uselist: self._create_collection_loader( context, collections, local_cols, populators ) else: self._create_scalar_loader( context, collections, local_cols, populators ) def _create_collection_loader( self, context, collections, local_cols, populators ): def load_collection_from_subq(state, dict_, row): collection = collections.get( tuple([row[col] for col in local_cols]), () ) state.get_impl(self.key).set_committed_value( state, dict_, collection ) def load_collection_from_subq_existing_row(state, dict_, row): if self.key not in dict_: load_collection_from_subq(state, dict_, row) populators["new"].append((self.key, load_collection_from_subq)) populators["existing"].append( (self.key, load_collection_from_subq_existing_row) ) if context.invoke_all_eagers: populators["eager"].append((self.key, collections.loader)) def _create_scalar_loader( self, context, collections, local_cols, populators ): def load_scalar_from_subq(state, dict_, row): collection = collections.get( tuple([row[col] for col in local_cols]), (None,) ) if len(collection) > 1: util.warn( "Multiple rows returned with " "uselist=False for eagerly-loaded attribute '%s' " % self ) scalar = collection[0] state.get_impl(self.key).set_committed_value(state, dict_, scalar) def load_scalar_from_subq_existing_row(state, dict_, row): if self.key not in dict_: load_scalar_from_subq(state, dict_, row) populators["new"].append((self.key, load_scalar_from_subq)) populators["existing"].append( (self.key, load_scalar_from_subq_existing_row) ) if context.invoke_all_eagers: populators["eager"].append((self.key, collections.loader)) @log.class_logger @properties.RelationshipProperty.strategy_for(lazy="joined") @properties.RelationshipProperty.strategy_for(lazy=False) class JoinedLoader(AbstractRelationshipLoader): """Provide loading behavior for a :class:`.RelationshipProperty` using joined eager loading. """ __slots__ = "join_depth", "_aliased_class_pool" def __init__(self, parent, strategy_key): super(JoinedLoader, self).__init__(parent, strategy_key) self.join_depth = self.parent_property.join_depth self._aliased_class_pool = [] def init_class_attribute(self, mapper): self.parent_property._get_strategy( (("lazy", "select"),) ).init_class_attribute(mapper) def setup_query( self, context, query_entity, path, loadopt, adapter, column_collection=None, parentmapper=None, chained_from_outerjoin=False, **kwargs ): """Add a left outer join to the statement that's being constructed.""" if not context.query._enable_eagerloads: return elif context.query._yield_per and self.uselist: context.query._no_yield_per("joined collection") path = path[self.parent_property] with_polymorphic = None user_defined_adapter = ( self._init_user_defined_eager_proc(loadopt, context) if loadopt else False ) if user_defined_adapter is not False: ( clauses, adapter, add_to_collection, ) = self._setup_query_on_user_defined_adapter( context, query_entity, path, adapter, user_defined_adapter ) else: # if not via query option, check for # a cycle if not path.contains(context.attributes, "loader"): if self.join_depth: if path.length / 2 > self.join_depth: return elif path.contains_mapper(self.mapper): return ( clauses, adapter, add_to_collection, chained_from_outerjoin, ) = self._generate_row_adapter( context, query_entity, path, loadopt, adapter, column_collection, parentmapper, chained_from_outerjoin, ) with_poly_entity = path.get( context.attributes, "path_with_polymorphic", None ) if with_poly_entity is not None: with_polymorphic = inspect( with_poly_entity ).with_polymorphic_mappers else: with_polymorphic = None path = path[self.entity] loading._setup_entity_query( context, self.mapper, query_entity, path, clauses, add_to_collection, with_polymorphic=with_polymorphic, parentmapper=self.mapper, chained_from_outerjoin=chained_from_outerjoin, ) if with_poly_entity is not None and None in set( context.secondary_columns ): raise sa_exc.InvalidRequestError( "Detected unaliased columns when generating joined " "load. Make sure to use aliased=True or flat=True " "when using joined loading with with_polymorphic()." ) def _init_user_defined_eager_proc(self, loadopt, context): # check if the opt applies at all if "eager_from_alias" not in loadopt.local_opts: # nope return False path = loadopt.path.parent # the option applies. check if the "user_defined_eager_row_processor" # has been built up. adapter = path.get( context.attributes, "user_defined_eager_row_processor", False ) if adapter is not False: # just return it return adapter # otherwise figure it out. alias = loadopt.local_opts["eager_from_alias"] root_mapper, prop = path[-2:] if alias is not None: if isinstance(alias, str): alias = prop.target.alias(alias) adapter = sql_util.ColumnAdapter( alias, equivalents=prop.mapper._equivalent_columns ) else: if path.contains(context.attributes, "path_with_polymorphic"): with_poly_entity = path.get( context.attributes, "path_with_polymorphic" ) adapter = orm_util.ORMAdapter( with_poly_entity, equivalents=prop.mapper._equivalent_columns, ) else: adapter = context.query._polymorphic_adapters.get( prop.mapper, None ) path.set( context.attributes, "user_defined_eager_row_processor", adapter ) return adapter def _setup_query_on_user_defined_adapter( self, context, entity, path, adapter, user_defined_adapter ): # apply some more wrapping to the "user defined adapter" # if we are setting up the query for SQL render. adapter = entity._get_entity_clauses(context.query, context) if adapter and user_defined_adapter: user_defined_adapter = user_defined_adapter.wrap(adapter) path.set( context.attributes, "user_defined_eager_row_processor", user_defined_adapter, ) elif adapter: user_defined_adapter = adapter path.set( context.attributes, "user_defined_eager_row_processor", user_defined_adapter, ) add_to_collection = context.primary_columns return user_defined_adapter, adapter, add_to_collection def _gen_pooled_aliased_class(self, context): # keep a local pool of AliasedClass objects that get re-used. # we need one unique AliasedClass per query per appearance of our # entity in the query. if inspect(self.entity).is_aliased_class: alt_selectable = inspect(self.entity).selectable else: alt_selectable = None key = ("joinedloader_ac", self) if key not in context.attributes: context.attributes[key] = idx = 0 else: context.attributes[key] = idx = context.attributes[key] + 1 if idx >= len(self._aliased_class_pool): to_adapt = orm_util.AliasedClass( self.mapper, alias=alt_selectable.alias(flat=True) if alt_selectable is not None else None, flat=True, use_mapper_path=True, ) # load up the .columns collection on the Alias() before # the object becomes shared among threads. this prevents # races for column identities. inspect(to_adapt).selectable.c self._aliased_class_pool.append(to_adapt) return self._aliased_class_pool[idx] def _generate_row_adapter( self, context, entity, path, loadopt, adapter, column_collection, parentmapper, chained_from_outerjoin, ): with_poly_entity = path.get( context.attributes, "path_with_polymorphic", None ) if with_poly_entity: to_adapt = with_poly_entity else: to_adapt = self._gen_pooled_aliased_class(context) clauses = inspect(to_adapt)._memo( ("joinedloader_ormadapter", self), orm_util.ORMAdapter, to_adapt, equivalents=self.mapper._equivalent_columns, adapt_required=True, allow_label_resolve=False, anonymize_labels=True, ) assert clauses.aliased_class is not None if self.parent_property.uselist: context.multi_row_eager_loaders = True innerjoin = ( loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin) if loadopt is not None else self.parent_property.innerjoin ) if not innerjoin: # if this is an outer join, all non-nested eager joins from # this path must also be outer joins chained_from_outerjoin = True context.create_eager_joins.append( ( self._create_eager_join, entity, path, adapter, parentmapper, clauses, innerjoin, chained_from_outerjoin, ) ) add_to_collection = context.secondary_columns path.set(context.attributes, "eager_row_processor", clauses) return clauses, adapter, add_to_collection, chained_from_outerjoin def _create_eager_join( self, context, query_entity, path, adapter, parentmapper, clauses, innerjoin, chained_from_outerjoin, ): if parentmapper is None: localparent = query_entity.mapper else: localparent = parentmapper # whether or not the Query will wrap the selectable in a subquery, # and then attach eager load joins to that (i.e., in the case of # LIMIT/OFFSET etc.) should_nest_selectable = ( context.multi_row_eager_loaders and context.query._should_nest_selectable ) query_entity_key = None if ( query_entity not in context.eager_joins and not should_nest_selectable and context.from_clause ): indexes = sql_util.find_left_clause_that_matches_given( context.from_clause, query_entity.selectable ) if len(indexes) > 1: # for the eager load case, I can't reproduce this right # now. For query.join() I can. raise sa_exc.InvalidRequestError( "Can't identify which query entity in which to joined " "eager load from. Please use an exact match when " "specifying the join path." ) if indexes: clause = context.from_clause[indexes[0]] # join to an existing FROM clause on the query. # key it to its list index in the eager_joins dict. # Query._compile_context will adapt as needed and # append to the FROM clause of the select(). query_entity_key, default_towrap = indexes[0], clause if query_entity_key is None: query_entity_key, default_towrap = ( query_entity, query_entity.selectable, ) towrap = context.eager_joins.setdefault( query_entity_key, default_towrap ) if adapter: if getattr(adapter, "aliased_class", None): # joining from an adapted entity. The adapted entity # might be a "with_polymorphic", so resolve that to our # specific mapper's entity before looking for our attribute # name on it. efm = inspect(adapter.aliased_class)._entity_for_mapper( localparent if localparent.isa(self.parent) else self.parent ) # look for our attribute on the adapted entity, else fall back # to our straight property onclause = getattr(efm.entity, self.key, self.parent_property) else: onclause = getattr( orm_util.AliasedClass( self.parent, adapter.selectable, use_mapper_path=True ), self.key, self.parent_property, ) else: onclause = self.parent_property assert clauses.aliased_class is not None attach_on_outside = ( not chained_from_outerjoin or not innerjoin or innerjoin == "unnested" or query_entity.entity_zero.represents_outer_join ) if attach_on_outside: # this is the "classic" eager join case. eagerjoin = orm_util._ORMJoin( towrap, clauses.aliased_class, onclause, isouter=not innerjoin or query_entity.entity_zero.represents_outer_join or (chained_from_outerjoin and isinstance(towrap, sql.Join)), _left_memo=self.parent, _right_memo=self.mapper, ) else: # all other cases are innerjoin=='nested' approach eagerjoin = self._splice_nested_inner_join( path, towrap, clauses, onclause ) context.eager_joins[query_entity_key] = eagerjoin # send a hint to the Query as to where it may "splice" this join eagerjoin.stop_on = query_entity.selectable if not parentmapper: # for parentclause that is the non-eager end of the join, # ensure all the parent cols in the primaryjoin are actually # in the # columns clause (i.e. are not deferred), so that aliasing applied # by the Query propagates those columns outward. # This has the effect # of "undefering" those columns. for col in sql_util._find_columns( self.parent_property.primaryjoin ): if localparent.persist_selectable.c.contains_column(col): if adapter: col = adapter.columns[col] context.primary_columns.append(col) if self.parent_property.order_by: context.eager_order_by += ( eagerjoin._target_adapter.copy_and_process )(util.to_list(self.parent_property.order_by)) def _splice_nested_inner_join( self, path, join_obj, clauses, onclause, splicing=False ): if splicing is False: # first call is always handed a join object # from the outside assert isinstance(join_obj, orm_util._ORMJoin) elif isinstance(join_obj, sql.selectable.FromGrouping): return self._splice_nested_inner_join( path, join_obj.element, clauses, onclause, splicing ) elif not isinstance(join_obj, orm_util._ORMJoin): if path[-2] is splicing: return orm_util._ORMJoin( join_obj, clauses.aliased_class, onclause, isouter=False, _left_memo=splicing, _right_memo=path[-1].mapper, ) else: # only here if splicing == True return None target_join = self._splice_nested_inner_join( path, join_obj.right, clauses, onclause, join_obj._right_memo ) if target_join is None: right_splice = False target_join = self._splice_nested_inner_join( path, join_obj.left, clauses, onclause, join_obj._left_memo ) if target_join is None: # should only return None when recursively called, # e.g. splicing==True assert ( splicing is not False ), "assertion failed attempting to produce joined eager loads" return None else: right_splice = True if right_splice: # for a right splice, attempt to flatten out # a JOIN b JOIN c JOIN .. to avoid needless # parenthesis nesting if not join_obj.isouter and not target_join.isouter: eagerjoin = join_obj._splice_into_center(target_join) else: eagerjoin = orm_util._ORMJoin( join_obj.left, target_join, join_obj.onclause, isouter=join_obj.isouter, _left_memo=join_obj._left_memo, ) else: eagerjoin = orm_util._ORMJoin( target_join, join_obj.right, join_obj.onclause, isouter=join_obj.isouter, _right_memo=join_obj._right_memo, ) eagerjoin._target_adapter = target_join._target_adapter return eagerjoin def _create_eager_adapter(self, context, result, adapter, path, loadopt): user_defined_adapter = ( self._init_user_defined_eager_proc(loadopt, context) if loadopt else False ) if user_defined_adapter is not False: decorator = user_defined_adapter # user defined eagerloads are part of the "primary" # portion of the load. # the adapters applied to the Query should be honored. if context.adapter and decorator: decorator = decorator.wrap(context.adapter) elif context.adapter: decorator = context.adapter else: decorator = path.get(context.attributes, "eager_row_processor") if decorator is None: return False if self.mapper._result_has_identity_key(result, decorator): return decorator else: # no identity key - don't return a row # processor, will cause a degrade to lazy return False def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): if not self.parent.class_manager[self.key].impl.supports_population: raise sa_exc.InvalidRequestError( "'%s' does not support object " "population - eager loading cannot be applied." % self ) our_path = path[self.parent_property] eager_adapter = self._create_eager_adapter( context, result, adapter, our_path, loadopt ) if eager_adapter is not False: key = self.key _instance = loading._instance_processor( self.mapper, context, result, our_path[self.entity], eager_adapter, ) if not self.uselist: self._create_scalar_loader(context, key, _instance, populators) else: self._create_collection_loader( context, key, _instance, populators ) else: self.parent_property._get_strategy( (("lazy", "select"),) ).create_row_processor( context, path, loadopt, mapper, result, adapter, populators ) def _create_collection_loader(self, context, key, _instance, populators): def load_collection_from_joined_new_row(state, dict_, row): collection = attributes.init_state_collection(state, dict_, key) result_list = util.UniqueAppender( collection, "append_without_event" ) context.attributes[(state, key)] = result_list inst = _instance(row) if inst is not None: result_list.append(inst) def load_collection_from_joined_existing_row(state, dict_, row): if (state, key) in context.attributes: result_list = context.attributes[(state, key)] else: # appender_key can be absent from context.attributes # with isnew=False when self-referential eager loading # is used; the same instance may be present in two # distinct sets of result columns collection = attributes.init_state_collection( state, dict_, key ) result_list = util.UniqueAppender( collection, "append_without_event" ) context.attributes[(state, key)] = result_list inst = _instance(row) if inst is not None: result_list.append(inst) def load_collection_from_joined_exec(state, dict_, row): _instance(row) populators["new"].append( (self.key, load_collection_from_joined_new_row) ) populators["existing"].append( (self.key, load_collection_from_joined_existing_row) ) if context.invoke_all_eagers: populators["eager"].append( (self.key, load_collection_from_joined_exec) ) def _create_scalar_loader(self, context, key, _instance, populators): def load_scalar_from_joined_new_row(state, dict_, row): # set a scalar object instance directly on the parent # object, bypassing InstrumentedAttribute event handlers. dict_[key] = _instance(row) def load_scalar_from_joined_existing_row(state, dict_, row): # call _instance on the row, even though the object has # been created, so that we further descend into properties existing = _instance(row) # conflicting value already loaded, this shouldn't happen if key in dict_: if existing is not dict_[key]: util.warn( "Multiple rows returned with " "uselist=False for eagerly-loaded attribute '%s' " % self ) else: # this case is when one row has multiple loads of the # same entity (e.g. via aliasing), one has an attribute # that the other doesn't. dict_[key] = existing def load_scalar_from_joined_exec(state, dict_, row): _instance(row) populators["new"].append((self.key, load_scalar_from_joined_new_row)) populators["existing"].append( (self.key, load_scalar_from_joined_existing_row) ) if context.invoke_all_eagers: populators["eager"].append( (self.key, load_scalar_from_joined_exec) ) @log.class_logger @properties.RelationshipProperty.strategy_for(lazy="selectin") class SelectInLoader(AbstractRelationshipLoader, util.MemoizedSlots): __slots__ = ( "join_depth", "omit_join", "_parent_alias", "_query_info", "_fallback_query_info", "_bakery", ) query_info = collections.namedtuple( "queryinfo", [ "load_only_child", "load_with_join", "in_expr", "pk_cols", "zero_idx", "child_lookup_cols", ], ) _chunksize = 500 def __init__(self, parent, strategy_key): super(SelectInLoader, self).__init__(parent, strategy_key) self.join_depth = self.parent_property.join_depth is_m2o = self.parent_property.direction is interfaces.MANYTOONE if self.parent_property.omit_join is not None: self.omit_join = self.parent_property.omit_join else: lazyloader = self.parent_property._get_strategy( (("lazy", "select"),) ) if is_m2o: self.omit_join = lazyloader.use_get else: self.omit_join = self.parent._get_clause[0].compare( lazyloader._rev_lazywhere, use_proxies=True, equivalents=self.parent._equivalent_columns, ) if self.omit_join: if is_m2o: self._query_info = self._init_for_omit_join_m2o() self._fallback_query_info = self._init_for_join() else: self._query_info = self._init_for_omit_join() else: self._query_info = self._init_for_join() def _init_for_omit_join(self): pk_to_fk = dict( self.parent_property._join_condition.local_remote_pairs ) pk_to_fk.update( (equiv, pk_to_fk[k]) for k in list(pk_to_fk) for equiv in self.parent._equivalent_columns.get(k, ()) ) pk_cols = fk_cols = [ pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk ] if len(fk_cols) > 1: in_expr = sql.tuple_(*fk_cols) zero_idx = False else: in_expr = fk_cols[0] zero_idx = True return self.query_info(False, False, in_expr, pk_cols, zero_idx, None) def _init_for_omit_join_m2o(self): pk_cols = self.mapper.primary_key if len(pk_cols) > 1: in_expr = sql.tuple_(*pk_cols) zero_idx = False else: in_expr = pk_cols[0] zero_idx = True lazyloader = self.parent_property._get_strategy((("lazy", "select"),)) lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols] return self.query_info( True, False, in_expr, pk_cols, zero_idx, lookup_cols ) def _init_for_join(self): self._parent_alias = aliased(self.parent.class_) pa_insp = inspect(self._parent_alias) pk_cols = [ pa_insp._adapt_element(col) for col in self.parent.primary_key ] if len(pk_cols) > 1: in_expr = sql.tuple_(*pk_cols) zero_idx = False else: in_expr = pk_cols[0] zero_idx = True return self.query_info(False, True, in_expr, pk_cols, zero_idx, None) def init_class_attribute(self, mapper): self.parent_property._get_strategy( (("lazy", "select"),) ).init_class_attribute(mapper) @util.dependencies("sqlalchemy.ext.baked") def _memoized_attr__bakery(self, baked): return baked.bakery(size=50) def create_row_processor( self, context, path, loadopt, mapper, result, adapter, populators ): if not self.parent.class_manager[self.key].impl.supports_population: raise sa_exc.InvalidRequestError( "'%s' does not support object " "population - eager loading cannot be applied." % self ) selectin_path = ( context.query._current_path or orm_util.PathRegistry.root ) + path if not orm_util._entity_isa(path[-1], self.parent): return if loading.PostLoad.path_exists( context, selectin_path, self.parent_property ): return path_w_prop = path[self.parent_property] selectin_path_w_prop = selectin_path[self.parent_property] # build up a path indicating the path from the leftmost # entity to the thing we're subquery loading. with_poly_entity = path_w_prop.get( context.attributes, "path_with_polymorphic", None ) if with_poly_entity is not None: effective_entity = with_poly_entity else: effective_entity = self.entity if not path_w_prop.contains(context.attributes, "loader"): if self.join_depth: if selectin_path_w_prop.length / 2 > self.join_depth: return elif selectin_path_w_prop.contains_mapper(self.mapper): return loading.PostLoad.callable_for_path( context, selectin_path, self.parent, self.parent_property, self._load_for_path, effective_entity, ) @util.dependencies("sqlalchemy.ext.baked") def _load_for_path( self, baked, context, path, states, load_only, effective_entity ): if load_only and self.key not in load_only: return query_info = self._query_info if query_info.load_only_child: our_states = collections.defaultdict(list) none_states = [] mapper = self.parent for state, overwrite in states: state_dict = state.dict related_ident = tuple( mapper._get_state_attr_by_column( state, state_dict, lk, passive=attributes.PASSIVE_NO_FETCH, ) for lk in query_info.child_lookup_cols ) # if the loaded parent objects do not have the foreign key # to the related item loaded, then degrade into the joined # version of selectinload if attributes.PASSIVE_NO_RESULT in related_ident: query_info = self._fallback_query_info break # organize states into lists keyed to particular foreign # key values. if None not in related_ident: our_states[related_ident].append( (state, state_dict, overwrite) ) else: # For FK values that have None, add them to a # separate collection that will be populated separately none_states.append((state, state_dict, overwrite)) # note the above conditional may have changed query_info if not query_info.load_only_child: our_states = [ (state.key[1], state, state.dict, overwrite) for state, overwrite in states ] pk_cols = query_info.pk_cols in_expr = query_info.in_expr if not query_info.load_with_join: # in "omit join" mode, the primary key column and the # "in" expression are in terms of the related entity. So # if the related entity is polymorphic or otherwise aliased, # we need to adapt our "pk_cols" and "in_expr" to that # entity. in non-"omit join" mode, these are against the # parent entity and do not need adaption. insp = inspect(effective_entity) if insp.is_aliased_class: pk_cols = [insp._adapt_element(col) for col in pk_cols] in_expr = insp._adapt_element(in_expr) pk_cols = [insp._adapt_element(col) for col in pk_cols] q = self._bakery( lambda session: session.query( query.Bundle("pk", *pk_cols), effective_entity ), self, ) if not query_info.load_with_join: # the Bundle we have in the "omit_join" case is against raw, non # annotated columns, so to ensure the Query knows its primary # entity, we add it explicitly. If we made the Bundle against # annotated columns, we hit a performance issue in this specific # case, which is detailed in issue #4347. q.add_criteria(lambda q: q.select_from(effective_entity)) else: # in the non-omit_join case, the Bundle is against the annotated/ # mapped column of the parent entity, but the #4347 issue does not # occur in this case. pa = self._parent_alias q.add_criteria( lambda q: q.select_from(pa).join( getattr(pa, self.parent_property.key).of_type( effective_entity ) ) ) if query_info.load_only_child: q.add_criteria( lambda q: q.filter( in_expr.in_(sql.bindparam("primary_keys", expanding=True)) ) ) else: q.add_criteria( lambda q: q.filter( in_expr.in_(sql.bindparam("primary_keys", expanding=True)) ) ) orig_query = context.query q._add_lazyload_options( orig_query._with_options, path[self.parent_property] ) if orig_query._populate_existing: q.add_criteria(lambda q: q.populate_existing()) if self.parent_property.order_by: if not query_info.load_with_join: eager_order_by = self.parent_property.order_by if insp.is_aliased_class: eager_order_by = [ insp._adapt_element(elem) for elem in eager_order_by ] q.add_criteria(lambda q: q.order_by(*eager_order_by)) else: def _setup_outermost_orderby(q): # imitate the same method that subquery eager loading uses, # looking for the adapted "secondary" table eagerjoin = q._from_obj[0] return q.order_by( *eagerjoin._target_adapter.copy_and_process( util.to_list(self.parent_property.order_by) ) ) q.add_criteria(_setup_outermost_orderby) if query_info.load_only_child: self._load_via_child( our_states, none_states, query_info, q, context ) else: self._load_via_parent(our_states, query_info, q, context) def _load_via_child(self, our_states, none_states, query_info, q, context): uselist = self.uselist # this sort is really for the benefit of the unit tests our_keys = sorted(our_states) while our_keys: chunk = our_keys[0 : self._chunksize] our_keys = our_keys[self._chunksize :] data = { k: v for k, v in q(context.session).params( primary_keys=[ key[0] if query_info.zero_idx else key for key in chunk ] ) } for key in chunk: # for a real foreign key and no concurrent changes to the # DB while running this method, "key" is always present in # data. However, for primaryjoins without real foreign keys # a non-None primaryjoin condition may still refer to no # related object. related_obj = data.get(key, None) for state, dict_, overwrite in our_states[key]: if not overwrite and self.key in dict_: continue state.get_impl(self.key).set_committed_value( state, dict_, related_obj if not uselist else [related_obj], ) # populate none states with empty value / collection for state, dict_, overwrite in none_states: if not overwrite and self.key in dict_: continue # note it's OK if this is a uselist=True attribute, the empty # collection will be populated state.get_impl(self.key).set_committed_value(state, dict_, None) def _load_via_parent(self, our_states, query_info, q, context): uselist = self.uselist _empty_result = () if uselist else None while our_states: chunk = our_states[0 : self._chunksize] our_states = our_states[self._chunksize :] primary_keys = [ key[0] if query_info.zero_idx else key for key, state, state_dict, overwrite in chunk ] data = collections.defaultdict(list) for k, v in itertools.groupby( q(context.session).params(primary_keys=primary_keys), lambda x: x[0], ): data[k].extend(vv[1] for vv in v) for key, state, state_dict, overwrite in chunk: if not overwrite and self.key in state_dict: continue collection = data.get(key, _empty_result) if not uselist and collection: if len(collection) > 1: util.warn( "Multiple rows returned with " "uselist=False for eagerly-loaded " "attribute '%s' " % self ) state.get_impl(self.key).set_committed_value( state, state_dict, collection[0] ) else: # note that empty tuple set on uselist=False sets the # value to None state.get_impl(self.key).set_committed_value( state, state_dict, collection ) def single_parent_validator(desc, prop): def _do_check(state, value, oldvalue, initiator): if value is not None and initiator.key == prop.key: hasparent = initiator.hasparent(attributes.instance_state(value)) if hasparent and oldvalue is not value: raise sa_exc.InvalidRequestError( "Instance %s is already associated with an instance " "of %s via its %s attribute, and is only allowed a " "single parent." % (orm_util.instance_str(value), state.class_, prop), code="bbf1", ) return value def append(state, value, initiator): return _do_check(state, value, None, initiator) def set_(state, value, oldvalue, initiator): return _do_check(state, value, oldvalue, initiator) event.listen( desc, "append", append, raw=True, retval=True, active_history=True ) event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)