diff --git a/docs/usage/amplitude.ipynb b/docs/usage/amplitude.ipynb index ec46a7adb..ff3b89003 100644 --- a/docs/usage/amplitude.ipynb +++ b/docs/usage/amplitude.ipynb @@ -120,7 +120,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "If we now use the {meth}`.HelicityAmplitudeBuilder.generate` method of this builder, we get an amplitude model without any dynamics:" + "If we now use the {meth}`.HelicityAmplitudeBuilder.formulate` method of this builder, we get an amplitude model without any dynamics:" ] }, { @@ -129,7 +129,7 @@ "metadata": {}, "outputs": [], "source": [ - "model_no_dynamics = model_builder.generate()" + "model_no_dynamics = model_builder.formulate()" ] }, { @@ -199,7 +199,7 @@ "metadata": {}, "outputs": [], "source": [ - "model = model_builder.generate()" + "model = model_builder.formulate()" ] }, { diff --git a/docs/usage/dynamics/custom.ipynb b/docs/usage/dynamics/custom.ipynb index 593ac34ad..d7ed65840 100644 --- a/docs/usage/dynamics/custom.ipynb +++ b/docs/usage/dynamics/custom.ipynb @@ -217,7 +217,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Now, just like in {ref}`usage/amplitude:Build SymPy expression`, it's simply a matter of plugging this builder into {meth}`.set_dynamics` and we can {meth}`~.HelicityAmplitudeBuilder.generate` a model with this custom lineshape:" + "Now, just like in {ref}`usage/amplitude:Build SymPy expression`, it's simply a matter of plugging this builder into {meth}`.set_dynamics` and we can {meth}`~.HelicityAmplitudeBuilder.formulate` a model with this custom lineshape:" ] }, { @@ -228,7 +228,7 @@ "source": [ "for name in reaction.get_intermediate_particles().names:\n", " model_builder.set_dynamics(name, create_my_dynamics)\n", - "model = model_builder.generate()" + "model = model_builder.formulate()" ] }, { diff --git a/docs/usage/interactive.ipynb b/docs/usage/interactive.ipynb index 25910f7b4..2304c376e 100644 --- a/docs/usage/interactive.ipynb +++ b/docs/usage/interactive.ipynb @@ -123,7 +123,7 @@ "model_builder.set_dynamics(initial_state_particle, create_non_dynamic_with_ff)\n", "for name in reaction.get_intermediate_particles().names:\n", " model_builder.set_dynamics(name, create_relativistic_breit_wigner_with_ff)\n", - "model = model_builder.generate()" + "model = model_builder.formulate()" ] }, { diff --git a/src/ampform/_transition_info.py b/src/ampform/_transition_info.py deleted file mode 100644 index 66e423b12..000000000 --- a/src/ampform/_transition_info.py +++ /dev/null @@ -1,140 +0,0 @@ -from typing import Dict, Iterable, List, Optional, Tuple, Union - -from qrules import InteractionProperties, ParticleCollection -from qrules.particle import Spin -from qrules.topology import Topology -from qrules.transition import StateTransition - - -def group_transitions( - transitions: Iterable[StateTransition], -) -> List[List[StateTransition]]: - """Match final and initial states in groups. - - Each `~qrules.transition.StateTransition` corresponds to a specific state - transition amplitude. This function groups together transitions, which have the - same initial and final state (including spin). This is needed to determine - the coherency of the individual amplitude parts. - """ - transition_groups: Dict[Tuple[tuple, tuple], List[StateTransition]] = {} - for transition in transitions: - initial_state_ids = transition.topology.outgoing_edge_ids - final_state_ids = transition.topology.incoming_edge_ids - transition_group = ( - tuple( - sorted( - [ - ( - transition.states[i].particle.name, - transition.states[i].spin_projection, - ) - for i in initial_state_ids - ] - ) - ), - tuple( - sorted( - [ - ( - transition.states[i].particle.name, - transition.states[i].spin_projection, - ) - for i in final_state_ids - ] - ) - ), - ) - if transition_group not in transition_groups: - transition_groups[transition_group] = [] - transition_groups[transition_group].append(transition) - - return list(transition_groups.values()) - - -def determine_attached_final_state( - topology: Topology, state_id: int -) -> List[int]: - """Determine all final state particles of a transition. - - These are attached downward (forward in time) for a given edge (resembling - the root). - """ - edge = topology.edges[state_id] - if edge.ending_node_id is None: - return [state_id] - return sorted( - topology.get_originating_final_state_edge_ids(edge.ending_node_id) - ) - - -def get_prefactor( - transition: StateTransition, -) -> float: - """Calculate the product of all prefactors defined in this transition.""" - prefactor = 1.0 - for node_id in transition.topology.nodes: - interaction = transition.interactions[node_id] - if interaction: - temp_prefactor = __validate_float_type( - interaction.parity_prefactor - ) - if temp_prefactor is not None: - prefactor *= temp_prefactor - return prefactor - - -def __validate_float_type( - interaction_property: Optional[Union[Spin, float]] -) -> Optional[float]: - if interaction_property is not None and not isinstance( - interaction_property, (float, int) - ): - raise TypeError( - f"{interaction_property.__class__.__name__} is not of type {float.__name__}" - ) - return interaction_property - - -def generate_particle_collection( - transitions: List[StateTransition], -) -> ParticleCollection: - particles = ParticleCollection() - for transition in transitions: - for state in transition.states.values(): - if state.particle not in particles: - particles.add(state.particle) - return particles - - -def get_angular_momentum(interaction: InteractionProperties) -> Spin: - l_magnitude = interaction.l_magnitude - l_projection = interaction.l_projection - if l_magnitude is None or l_projection is None: - raise TypeError( - "Angular momentum L not defined!", l_magnitude, l_projection - ) - return Spin(l_magnitude, l_projection) - - -def get_coupled_spin(interaction: InteractionProperties) -> Spin: - s_magnitude = interaction.s_magnitude - s_projection = interaction.s_projection - if s_magnitude is None or s_projection is None: - raise TypeError("Coupled spin S not defined!") - return Spin(s_magnitude, s_projection) - - -def assert_isobar_topology(topology: Topology) -> None: - for node_id in topology.nodes: - parent_state_ids = topology.get_edge_ids_ingoing_to_node(node_id) - if len(parent_state_ids) != 1: - raise ValueError( - f"Node {node_id} has {len(parent_state_ids)} parent edges," - " so this is not an isobar decay" - ) - child_state_ids = topology.get_edge_ids_outgoing_from_node(node_id) - if len(child_state_ids) != 2: - raise ValueError( - f"Node {node_id} decays to {len(child_state_ids)} edges," - " so this is not an isobar decay" - ) diff --git a/src/ampform/helicity.py b/src/ampform/helicity.py deleted file mode 100644 index dfb3d9ee0..000000000 --- a/src/ampform/helicity.py +++ /dev/null @@ -1,756 +0,0 @@ -"""Generate an amplitude model with the helicity formalism.""" - -import logging -import operator -from functools import reduce -from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union - -import attr -import sympy as sp -from attr.validators import instance_of -from qrules import ParticleCollection, ReactionInfo -from qrules.combinatorics import ( - perform_external_edge_identical_particle_combinatorics, -) -from qrules.particle import Spin -from qrules.transition import State, StateTransition -from sympy.physics.quantum.cg import CG -from sympy.physics.quantum.spin import Rotation as Wigner -from sympy.printing.latex import LatexPrinter - -from ._transition_info import ( - generate_particle_collection, - get_angular_momentum, - get_coupled_spin, - get_prefactor, - group_transitions, -) -from .dynamics.builder import ( - ResonanceDynamicsBuilder, - TwoBodyKinematicVariableSet, - verify_signature, -) -from .kinematics import ( - HelicityAdapter, - get_helicity_angle_label, - get_invariant_mass_label, -) - -ParameterValue = Union[float, complex, int] - - -@attr.s(frozen=True, auto_attribs=True) -class _EdgeWithState: - state_id: int - state: State - - @classmethod - def from_transition( - cls, transition: StateTransition, state_id: int - ) -> "_EdgeWithState": - state = transition.states[state_id] - return cls(state_id, state) - - -@attr.s(frozen=True, auto_attribs=True) -class _TwoBodyDecay: - parent: _EdgeWithState - children: Tuple[_EdgeWithState, _EdgeWithState] - - @classmethod - def from_transition( - cls, transition: StateTransition, node_id: int - ) -> "_TwoBodyDecay": - topology = transition.topology - in_state_ids = topology.get_edge_ids_ingoing_to_node(node_id) - out_state_ids = topology.get_edge_ids_outgoing_from_node(node_id) - if len(in_state_ids) != 1 or len(out_state_ids) != 2: - raise ValueError( - f"Node {node_id} does not represent a 1-to-2 body decay!" - ) - ingoing_state_id = next(iter(in_state_ids)) - - sorted_by_id = sorted(out_state_ids) - final__state_ids = [ - i for i in sorted_by_id if i in topology.outgoing_edge_ids - ] - intermediate_state_ids = [ - i for i in sorted_by_id if i in topology.intermediate_edge_ids - ] - sorted_by_ending = tuple(intermediate_state_ids + final__state_ids) - out_state_id1, out_state_id2, *_ = tuple(sorted_by_ending) - - return cls( - parent=_EdgeWithState.from_transition( - transition, ingoing_state_id - ), - children=( - _EdgeWithState.from_transition(transition, out_state_id1), - _EdgeWithState.from_transition(transition, out_state_id2), - ), - ) - - -@attr.s(frozen=True) -class HelicityModel: - _expression: sp.Expr = attr.ib( - validator=attr.validators.instance_of(sp.Expr) - ) - _parameter_defaults: Dict[sp.Symbol, ParameterValue] = attr.ib( - validator=attr.validators.instance_of(dict) - ) - _components: Dict[str, sp.Expr] = attr.ib( - validator=attr.validators.instance_of(dict) - ) - _adapter: HelicityAdapter = attr.ib( - validator=attr.validators.instance_of(HelicityAdapter) - ) - particles: ParticleCollection = attr.ib( - validator=instance_of(ParticleCollection) - ) - - @property - def expression(self) -> sp.Expr: - return self._expression - - @property - def components(self) -> Dict[str, sp.Expr]: - return self._components - - @property - def parameter_defaults(self) -> Dict[sp.Symbol, ParameterValue]: - return self._parameter_defaults - - @property - def adapter(self) -> HelicityAdapter: - return self._adapter - - -class _HelicityAmplitudeNameGenerator: - def __init__(self) -> None: - self.parity_partner_coefficient_mapping: Dict[str, str] = {} - - def _generate_amplitude_coefficient_couple( - self, transition: StateTransition, node_id: int - ) -> Tuple[str, str, str]: - incoming_state, outgoing_states = self._retrieve_helicity_info( - transition, node_id - ) - par_name_suffix = self.generate_amplitude_coefficient_name( - transition, node_id - ) - - pp_par_name_suffix = ( - _generate_particles_string([incoming_state], use_helicity=False) - + R" \to " - + _generate_particles_string( - outgoing_states, make_parity_partner=True - ) - ) - - priority_name_suffix = par_name_suffix - if outgoing_states[0].spin_projection < 0 or ( - outgoing_states[0].spin_projection == 0 - and outgoing_states[1].spin_projection < 0 - ): - priority_name_suffix = pp_par_name_suffix - - return (par_name_suffix, pp_par_name_suffix, priority_name_suffix) - - def register_amplitude_coefficient_name( - self, transition: StateTransition - ) -> None: - for node_id in transition.topology.nodes: - ( - coefficient_suffix, - parity_partner_coefficient_suffix, - priority_partner_coefficient_suffix, - ) = self._generate_amplitude_coefficient_couple( - transition, node_id - ) - - if transition.interactions[node_id].parity_prefactor is None: - continue - - if ( - coefficient_suffix - not in self.parity_partner_coefficient_mapping - ): - if ( - parity_partner_coefficient_suffix - in self.parity_partner_coefficient_mapping - ): - if ( - parity_partner_coefficient_suffix - == priority_partner_coefficient_suffix - ): - self.parity_partner_coefficient_mapping[ - coefficient_suffix - ] = parity_partner_coefficient_suffix - else: - self.parity_partner_coefficient_mapping[ - parity_partner_coefficient_suffix - ] = coefficient_suffix - self.parity_partner_coefficient_mapping[ - coefficient_suffix - ] = coefficient_suffix - - else: - # if neither this coefficient nor its partner are registered just add it - self.parity_partner_coefficient_mapping[ - coefficient_suffix - ] = coefficient_suffix - - def generate_unique_amplitude_name( - self, - transition: StateTransition, - node_id: Optional[int] = None, - ) -> str: - """Generates a unique name for the amplitude corresponding. - - That is, corresponging to the given :class:`StateTransition`. If - ``node_id`` is given, it generates a unique name for the partial - amplitude corresponding to the interaction node of the given - :class:`StateTransition`. - """ - name = "" - if node_id is None: - node_ids = transition.topology.nodes - else: - node_ids = frozenset({node_id}) - names: List[str] = [] - for i in node_ids: - incoming_state, outgoing_states = self._retrieve_helicity_info( - transition, i - ) - name = ( - _generate_particles_string([incoming_state]) - + R" \to " - + _generate_particles_string(outgoing_states) - ) - names.append(name) - return "; ".join(names) - - @staticmethod - def _retrieve_helicity_info( - transition: StateTransition, node_id: int - ) -> Tuple[State, Tuple[State, State]]: - in_edge_ids = transition.topology.get_edge_ids_ingoing_to_node(node_id) - out_edge_ids = transition.topology.get_edge_ids_outgoing_from_node( - node_id - ) - in_helicity_list = _get_helicity_particles(transition, in_edge_ids) - out_helicity_list = _get_helicity_particles(transition, out_edge_ids) - if len(in_helicity_list) != 1 or len(out_helicity_list) != 2: - raise ValueError(f"Node {node_id} it not a 1-to-2 decay") - return ( - in_helicity_list[0], - (out_helicity_list[0], out_helicity_list[1]), - ) - - def generate_amplitude_coefficient_name( - self, transition: StateTransition, node_id: int - ) -> str: - """Generate partial amplitude coefficient name suffix.""" - in_hel_info, out_hel_info = self._retrieve_helicity_info( - transition, node_id - ) - return ( - _generate_particles_string([in_hel_info], use_helicity=False) - + R" \to " - + _generate_particles_string(out_hel_info) - ) - - def generate_sequential_amplitude_suffix( - self, transition: StateTransition - ) -> str: - """Generate unique suffix for a sequential amplitude transition.""" - coefficient_names: List[str] = [] - for node_id in transition.topology.nodes: - suffix = self.generate_amplitude_coefficient_name( - transition, node_id - ) - if suffix in self.parity_partner_coefficient_mapping: - suffix = self.parity_partner_coefficient_mapping[suffix] - coefficient_names.append(suffix) - return "; ".join(coefficient_names) - - -class _CanonicalAmplitudeNameGenerator(_HelicityAmplitudeNameGenerator): - def generate_amplitude_coefficient_name( - self, transition: StateTransition, node_id: int - ) -> str: - incoming_state, outgoing_states = self._retrieve_helicity_info( - transition, node_id - ) - return ( - _generate_particles_string([incoming_state], use_helicity=False) - + self.__generate_ls_arrow(transition, node_id) - + _generate_particles_string(outgoing_states, use_helicity=False) - ) - - def generate_unique_amplitude_name( - self, - transition: StateTransition, - node_id: Optional[int] = None, - ) -> str: - if isinstance(node_id, int): - node_ids = frozenset({node_id}) - else: - node_ids = transition.topology.nodes - names: List[str] = [] - for node in node_ids: - helicity_name = super().generate_unique_amplitude_name( - transition, node - ) - canonical_name = helicity_name.replace( - R" \to ", - self.__generate_ls_arrow(transition, node), - ) - names.append(canonical_name) - return "; ".join(names) - - def __generate_ls_arrow( - self, transition: StateTransition, node_id: int - ) -> str: - angular_momentum, spin = self.__get_ls_coupling(transition, node_id) - return fR" \xrightarrow[S={spin}]{{L={angular_momentum}}} " - - @staticmethod - def __get_ls_coupling( - transition: StateTransition, node_id: int - ) -> Tuple[sp.Rational, sp.Rational]: - interaction = transition.interactions[node_id] - ang_orb_mom = sp.Rational(get_angular_momentum(interaction).magnitude) - spin = sp.Rational(get_coupled_spin(interaction).magnitude) - return ang_orb_mom, spin - - -def _get_transition_group_label( - transition_group: List[StateTransition], -) -> str: - label = "" - if transition_group: - first_transition = next(iter(transition_group)) - ise = first_transition.topology.incoming_edge_ids - fse = first_transition.topology.outgoing_edge_ids - is_names = _get_helicity_particles(first_transition, ise) - fs_names = _get_helicity_particles(first_transition, fse) - label += ( - _generate_particles_string(is_names) - + R" \to " - + _generate_particles_string(fs_names) - ) - return label - - -def _get_helicity_particles( - transition: StateTransition, state_ids: Iterable[int] -) -> List[State]: - """Get a sorted list of `~qrules.transition.State` instances. - - In order to ensure correct naming of amplitude coefficients the list has to - be sorted by name. The same coefficient names have to be created for two - transitions that only differ from a kinematic standpoint (swapped external - edges). - """ - helicity_list = [transition.states[i] for i in state_ids] - return sorted(helicity_list, key=lambda s: s.particle.name) - - -def _generate_particles_string( - helicity_list: Iterable[State], - use_helicity: bool = True, - make_parity_partner: bool = False, -) -> str: - output_string = "" - for state in helicity_list: - if state.particle.latex is not None: - output_string += state.particle.latex - else: - output_string += state.particle.name - if use_helicity: - if make_parity_partner: - helicity = -1 * state.spin_projection - else: - helicity = state.spin_projection - helicity = __attempt_int_cast(helicity) - if helicity > 0: - helicity_str = f"+{helicity}" - else: - helicity_str = str(helicity) - output_string += f"_{{{helicity_str}}}" - output_string += " " - return output_string[:-1] - - -def _generate_kinematic_variable_set( - transition: StateTransition, node_id: int -) -> TwoBodyKinematicVariableSet: - decay = _TwoBodyDecay.from_transition(transition, node_id) - inv_mass, phi, theta = _generate_kinematic_variables(transition, node_id) - child1_mass = sp.Symbol( - get_invariant_mass_label( - transition.topology, decay.children[0].state_id - ), - real=True, - ) - child2_mass = sp.Symbol( - get_invariant_mass_label( - transition.topology, decay.children[1].state_id - ), - real=True, - ) - return TwoBodyKinematicVariableSet( - incoming_state_mass=inv_mass, - outgoing_state_mass1=child1_mass, - outgoing_state_mass2=child2_mass, - helicity_theta=theta, - helicity_phi=phi, - angular_momentum=_extract_angular_momentum(transition, node_id), - ) - - -def _extract_angular_momentum( - transition: StateTransition, node_id: int -) -> int: - interaction = transition.interactions[node_id] - if interaction.l_magnitude is not None: - return interaction.l_magnitude - - incoming_ids = transition.topology.get_edge_ids_ingoing_to_node(node_id) - outgoing_ids = transition.topology.get_edge_ids_outgoing_from_node(node_id) - state_id = None - if len(incoming_ids) == 1: - state_id = next(iter(incoming_ids)) - elif len(outgoing_ids) == 1: - state_id = next(iter(outgoing_ids)) - - if state_id is None: - raise ValueError( - f"StateTransition does not have one to two body structure" - f" at node with id={node_id}" - ) - - spin_magnitude = transition.states[state_id].particle.spin - if spin_magnitude.is_integer(): - return int(spin_magnitude) - - raise ValueError( - f"Spin magnitude ({spin_magnitude}) of single particle state cannot be" - f" used as the angular momentum as it is not integral!" - ) - - -def _generate_kinematic_variables( - transition: StateTransition, node_id: int -) -> Tuple[sp.Symbol, sp.Symbol, sp.Symbol]: - """Generate symbol for invariant mass, phi angle, and theta angle.""" - decay = _TwoBodyDecay.from_transition(transition, node_id) - phi_label, theta_label = get_helicity_angle_label( - transition.topology, decay.children[0].state_id - ) - inv_mass_label = get_invariant_mass_label( - transition.topology, decay.parent.state_id - ) - return ( - sp.Symbol(inv_mass_label, real=True), - sp.Symbol(phi_label, real=True), - sp.Symbol(theta_label, real=True), - ) - - -class HelicityAmplitudeBuilder: # pylint: disable=too-many-instance-attributes - """Amplitude model generator for the helicity formalism.""" - - def __init__(self, reaction: ReactionInfo) -> None: - self.name_generator = _HelicityAmplitudeNameGenerator() - self.__reaction = reaction - self.__parameter_defaults: Dict[sp.Symbol, ParameterValue] = {} - self.__components: Dict[str, sp.Expr] = {} - self.__dynamics_choices: Dict[ - _TwoBodyDecay, ResonanceDynamicsBuilder - ] = {} - - if len(reaction.transitions) < 1: - raise ValueError( - f"At least one {StateTransition.__name__} required to" - " genenerate an amplitude model!" - ) - self.__adapter = HelicityAdapter(reaction) - for grouping in reaction.transition_groups: - self.__adapter.register_topology(grouping.topology) - self.__particles = generate_particle_collection(reaction.transitions) - - def set_dynamics( - self, particle_name: str, dynamics_builder: ResonanceDynamicsBuilder - ) -> None: - verify_signature(dynamics_builder, ResonanceDynamicsBuilder) - for transition in self.__reaction.transitions: - for node_id in transition.topology.nodes: - decay = _TwoBodyDecay.from_transition(transition, node_id) - decay_particle = decay.parent.state.particle - if decay_particle.name == particle_name: - self.__dynamics_choices[decay] = dynamics_builder - - def generate(self) -> HelicityModel: - self.__components = {} - self.__parameter_defaults = {} - return HelicityModel( - expression=self.__generate_intensity(), - components=self.__components, - parameter_defaults=self.__parameter_defaults, - adapter=self.__adapter, - particles=self.__particles, - ) - - def __generate_intensity(self) -> sp.Expr: - transition_groups = group_transitions(self.__reaction.transitions) - logging.debug("There are %d transition groups", len(transition_groups)) - - self.__create_parameter_couplings(transition_groups) - coherent_intensities = [] - for group in transition_groups: - coherent_intensities.append( - self.__generate_coherent_intensity(group) - ) - if len(coherent_intensities) == 0: - raise ValueError("List of coherent intensities cannot be empty") - return sum(coherent_intensities) - - def __create_dynamics( - self, transition: StateTransition, node_id: int - ) -> sp.Expr: - decay = _TwoBodyDecay.from_transition(transition, node_id) - if decay in self.__dynamics_choices: - builder = self.__dynamics_choices[decay] - variable_set = _generate_kinematic_variable_set( - transition, node_id - ) - expression, parameters = builder( - decay.parent.state.particle, variable_set - ) - for par, value in parameters.items(): - if par in self.__parameter_defaults: - previous_value = self.__parameter_defaults[par] - if value != previous_value: - logging.warning( - f"Default value for parameter {par.name}" - f" inconsistent {value} and {previous_value}" - ) - self.__parameter_defaults[par] = value - - return expression - - return 1 - - def __create_parameter_couplings( - self, transition_groups: List[List[StateTransition]] - ) -> None: - for graph_group in transition_groups: - for transition in graph_group: - self.name_generator.register_amplitude_coefficient_name( - transition - ) - - def __generate_coherent_intensity( - self, - transition_group: List[StateTransition], - ) -> sp.Expr: - graph_group_label = _get_transition_group_label(transition_group) - sequential_expressions: List[sp.Expr] = [] - for transition in transition_group: - sequential_graphs = ( - perform_external_edge_identical_particle_combinatorics( - transition.to_graph() - ) - ) - for graph in sequential_graphs: - transition = StateTransition.from_graph(graph) - expression = self.__generate_sequential_decay(transition) - sequential_expressions.append(expression) - amplitude_sum = sum(sequential_expressions) - coh_intensity = abs(amplitude_sum) ** 2 - self.__components[fR"I_{{{graph_group_label}}}"] = coh_intensity - return coh_intensity - - def __generate_sequential_decay( - self, transition: StateTransition - ) -> sp.Expr: - partial_decays: List[sp.Symbol] = [ - self._generate_partial_decay(transition, node_id) - for node_id in transition.topology.nodes - ] - sequential_amplitudes = reduce(operator.mul, partial_decays) - - coefficient = self.__generate_amplitude_coefficient(transition) - prefactor = self.__generate_amplitude_prefactor(transition) - expression = coefficient * sequential_amplitudes - if prefactor is not None: - expression = prefactor * expression - self.__components[ - f"A_{{{self.name_generator.generate_unique_amplitude_name(transition)}}}" - ] = expression - return expression - - def _generate_partial_decay( # pylint: disable=too-many-locals - self, transition: StateTransition, node_id: int - ) -> sp.Expr: - wigner_d = self._generate_wigner_d(transition, node_id) - dynamics_symbol = self.__create_dynamics(transition, node_id) - return wigner_d * dynamics_symbol - - @staticmethod - def _generate_wigner_d( - transition: StateTransition, node_id: int - ) -> sp.Symbol: - decay = _TwoBodyDecay.from_transition(transition, node_id) - _, phi, theta = _generate_kinematic_variables(transition, node_id) - - return Wigner.D( - j=sp.nsimplify(decay.parent.state.particle.spin), - m=sp.nsimplify(decay.parent.state.spin_projection), - mp=sp.nsimplify( - decay.children[0].state.spin_projection - - decay.children[1].state.spin_projection - ), - alpha=-phi, - beta=theta, - gamma=0, - ) - - def __generate_amplitude_coefficient( - self, transition: StateTransition - ) -> sp.Symbol: - """Generate coefficient parameter for a sequential amplitude transition. - - Generally, each partial amplitude of a sequential amplitude transition - should check itself if it or a parity partner is already defined. If so - a coupled coefficient is introduced. - """ - suffix = self.name_generator.generate_sequential_amplitude_suffix( - transition - ) - coefficient_symbol = sp.Symbol(f"C_{{{suffix}}}") - self.__parameter_defaults[coefficient_symbol] = complex(1, 0) - return coefficient_symbol - - def __generate_amplitude_prefactor( - self, transition: StateTransition - ) -> Optional[float]: - prefactor = get_prefactor(transition) - if prefactor != 1.0: - for node_id in transition.topology.nodes: - raw_suffix = ( - self.name_generator.generate_amplitude_coefficient_name( - transition, node_id - ) - ) - if ( - raw_suffix - in self.name_generator.parity_partner_coefficient_mapping - ): - coefficient_suffix = ( - self.name_generator.parity_partner_coefficient_mapping[ - raw_suffix - ] - ) - if coefficient_suffix != raw_suffix: - return prefactor - return None - - -class CanonicalAmplitudeBuilder(HelicityAmplitudeBuilder): - r"""Amplitude model generator for the canonical helicity formalism. - - This class defines a full amplitude in the canonical formalism, using the - helicity formalism as a foundation. The key here is that we take the full - helicity intensity as a template, and just exchange the helicity amplitudes - :math:`F` as a sum of canonical amplitudes :math:`A`: - - .. math:: - - F^J_{\lambda_1,\lambda_2} = \sum_{LS} \mathrm{norm}(A^J_{LS})C^2. - - Here, :math:`C` stands for `Clebsch-Gordan factor - `_. - """ - - def __init__(self, reaction_result: ReactionInfo) -> None: - super().__init__(reaction_result) - self.name_generator = _CanonicalAmplitudeNameGenerator() - - def _generate_partial_decay( # pylint: disable=too-many-locals - self, transition: StateTransition, node_id: int - ) -> sp.Symbol: - amplitude = super()._generate_partial_decay(transition, node_id) - - interaction = transition.interactions[node_id] - ang_mom = get_angular_momentum(interaction) - spin = get_coupled_spin(interaction) - if ang_mom.projection != 0.0: - raise ValueError(f"Projection of L is non-zero!: {ang_mom}") - - topology = transition.topology - in_state_ids = topology.get_edge_ids_ingoing_to_node(node_id) - out_state_ids = topology.get_edge_ids_outgoing_from_node(node_id) - - incoming_state_id = next(iter(in_state_ids)) - incoming_state = transition.states[incoming_state_id] - parent_spin = Spin( - incoming_state.particle.spin, - incoming_state.spin_projection, - ) - - daughter_spins: List[Spin] = [] - for state_id in out_state_ids: - state = transition.states[state_id] - daughter_spin = Spin( - state.particle.spin, - state.spin_projection, - ) - if daughter_spin is not None and isinstance(daughter_spin, Spin): - daughter_spins.append(daughter_spin) - - decay_particle_lambda = ( - daughter_spins[0].projection - daughter_spins[1].projection - ) - - cg_ls = CG( - j1=sp.nsimplify(ang_mom.magnitude), - m1=sp.nsimplify(ang_mom.projection), - j2=sp.nsimplify(spin.magnitude), - m2=sp.nsimplify(decay_particle_lambda), - j3=sp.nsimplify(parent_spin.magnitude), - m3=sp.nsimplify(decay_particle_lambda), - ) - cg_ss = CG( - j1=sp.nsimplify(daughter_spins[0].magnitude), - m1=sp.nsimplify(daughter_spins[0].projection), - j2=sp.nsimplify(daughter_spins[1].magnitude), - m2=sp.nsimplify(-daughter_spins[1].projection), - j3=sp.nsimplify(spin.magnitude), - m3=sp.nsimplify(decay_particle_lambda), - ) - return cg_ls * cg_ss * amplitude - - -# https://github.com/sympy/sympy/issues/21001 -# pylint: disable=protected-access, unused-argument -def _latex_fix(self: Type[CG], printer: LatexPrinter, *args: Any) -> str: - j3, m3, j1, m1, j2, m2 = map( - printer._print, - (self.j3, self.m3, self.j1, self.m1, self.j2, self.m2), - ) - return f"{{C^{{{j3},{m3}}}_{{{j1},{m1},{j2},{m2}}}}}" - - -CG._latex = _latex_fix - - -def __attempt_int_cast(value: float) -> Union[float, int]: - if isinstance(value, int): - return value - if value.is_integer(): - return int(value) - return value diff --git a/src/ampform/helicity/__init__.py b/src/ampform/helicity/__init__.py new file mode 100644 index 000000000..35650c7f3 --- /dev/null +++ b/src/ampform/helicity/__init__.py @@ -0,0 +1,469 @@ +"""Generate an amplitude model with the helicity formalism.""" + +import logging +import operator +from collections import defaultdict +from functools import reduce +from typing import ( + Any, + DefaultDict, + Dict, + Iterable, + List, + Optional, + Tuple, + Type, + Union, +) + +import attr +import sympy as sp +from attr.validators import instance_of +from qrules.combinatorics import ( + perform_external_edge_identical_particle_combinatorics, +) +from qrules.particle import ParticleCollection +from qrules.transition import ReactionInfo, StateTransition +from sympy.physics.quantum.cg import CG +from sympy.physics.quantum.spin import Rotation as Wigner +from sympy.printing.latex import LatexPrinter + +from ampform.dynamics.builder import ( + ResonanceDynamicsBuilder, + TwoBodyKinematicVariableSet, + verify_signature, +) +from ampform.kinematics import ( + HelicityAdapter, + get_helicity_angle_label, + get_invariant_mass_label, +) + +from .decay import TwoBodyDecay, get_angular_momentum, get_coupled_spin +from .naming import ( + CanonicalAmplitudeNameGenerator, + HelicityAmplitudeNameGenerator, + generate_transition_label, +) + +ParameterValue = Union[float, complex, int] + + +@attr.s(frozen=True) +class HelicityModel: + _expression: sp.Expr = attr.ib( + validator=attr.validators.instance_of(sp.Expr) + ) + _parameter_defaults: Dict[sp.Symbol, ParameterValue] = attr.ib( + validator=attr.validators.instance_of(dict) + ) + _components: Dict[str, sp.Expr] = attr.ib( + validator=attr.validators.instance_of(dict) + ) + _adapter: HelicityAdapter = attr.ib( + validator=attr.validators.instance_of(HelicityAdapter) + ) + particles: ParticleCollection = attr.ib( + validator=instance_of(ParticleCollection) + ) + + @property + def expression(self) -> sp.Expr: + return self._expression + + @property + def components(self) -> Dict[str, sp.Expr]: + return self._components + + @property + def parameter_defaults(self) -> Dict[sp.Symbol, ParameterValue]: + return self._parameter_defaults + + @property + def adapter(self) -> HelicityAdapter: + return self._adapter + + +class HelicityAmplitudeBuilder: + """Amplitude model generator for the helicity formalism.""" + + def __init__(self, reaction: ReactionInfo) -> None: + self._name_generator = HelicityAmplitudeNameGenerator() + self.__reaction = reaction + self.__parameter_defaults: Dict[sp.Symbol, ParameterValue] = {} + self.__components: Dict[str, sp.Expr] = {} + self.__dynamics_choices: Dict[ + TwoBodyDecay, ResonanceDynamicsBuilder + ] = {} + + if len(reaction.transitions) < 1: + raise ValueError( + f"At least one {StateTransition.__name__} required to" + " genenerate an amplitude model!" + ) + self.__adapter = HelicityAdapter(reaction) + for grouping in reaction.transition_groups: + self.__adapter.register_topology(grouping.topology) + self.__particles = extract_particle_collection(reaction.transitions) + + def set_dynamics( + self, particle_name: str, dynamics_builder: ResonanceDynamicsBuilder + ) -> None: + verify_signature(dynamics_builder, ResonanceDynamicsBuilder) + for transition in self.__reaction.transitions: + for node_id in transition.topology.nodes: + decay = TwoBodyDecay.from_transition(transition, node_id) + decaying_particle = decay.parent.particle + if decaying_particle.name == particle_name: + self.__dynamics_choices[decay] = dynamics_builder + + def formulate(self) -> HelicityModel: + self.__components = {} + self.__parameter_defaults = {} + return HelicityModel( + expression=self.__formulate_top_expression(), + components=self.__components, + parameter_defaults=self.__parameter_defaults, + adapter=self.__adapter, + particles=self.__particles, + ) + + def __formulate_top_expression(self) -> sp.Expr: + transition_groups = group_transitions(self.__reaction.transitions) + self.__register_parameter_couplings(transition_groups) + coherent_intensities = [ + self.__formulate_coherent_intensity(group) + for group in transition_groups + ] + return sum(coherent_intensities) + + def __register_parameter_couplings( + self, transition_groups: List[List[StateTransition]] + ) -> None: + for graph_group in transition_groups: + for transition in graph_group: + self._name_generator.register_amplitude_coefficient_name( + transition + ) + + def __formulate_coherent_intensity( + self, transition_group: List[StateTransition] + ) -> sp.Expr: + graph_group_label = generate_transition_label(transition_group[0]) + sequential_expressions: List[sp.Expr] = [] + for transition in transition_group: + sequential_graphs = ( + perform_external_edge_identical_particle_combinatorics( + transition.to_graph() + ) + ) + for graph in sequential_graphs: + transition = StateTransition.from_graph(graph) + expression = self.__formulate_sequential_decay(transition) + sequential_expressions.append(expression) + amplitude_sum = sum(sequential_expressions) + coherent_intensity = abs(amplitude_sum) ** 2 + self.__components[fR"I_{{{graph_group_label}}}"] = coherent_intensity + return coherent_intensity + + def __formulate_sequential_decay( + self, transition: StateTransition + ) -> sp.Expr: + partial_decays: List[sp.Symbol] = [ + self._formulate_partial_decay(transition, node_id) + for node_id in transition.topology.nodes + ] + sequential_amplitudes = reduce(operator.mul, partial_decays) + + coefficient = self.__generate_amplitude_coefficient(transition) + prefactor = self.__generate_amplitude_prefactor(transition) + expression = coefficient * sequential_amplitudes + if prefactor is not None: + expression = prefactor * expression + self.__components[ + f"A_{{{self._name_generator.generate_amplitude_name(transition)}}}" + ] = expression + return expression + + def _formulate_partial_decay( + self, transition: StateTransition, node_id: int + ) -> sp.Expr: + wigner_d = formulate_wigner_d(transition, node_id) + dynamics = self.__formulate_dynamics(transition, node_id) + return wigner_d * dynamics + + def __formulate_dynamics( + self, transition: StateTransition, node_id: int + ) -> sp.Expr: + decay = TwoBodyDecay.from_transition(transition, node_id) + if decay not in self.__dynamics_choices: + return 1 + + builder = self.__dynamics_choices[decay] + variable_set = _generate_kinematic_variable_set(transition, node_id) + expression, parameters = builder(decay.parent.particle, variable_set) + for par, value in parameters.items(): + if par in self.__parameter_defaults: + previous_value = self.__parameter_defaults[par] + if value != previous_value: + logging.warning( + f'New default value {value} for parameter "{par.name}"' + f" is inconsistent with existing value {previous_value}" + ) + self.__parameter_defaults[par] = value + + return expression + + def __generate_amplitude_coefficient( + self, transition: StateTransition + ) -> sp.Symbol: + """Generate coefficient parameter for a sequential amplitude transition. + + Generally, each partial amplitude of a sequential amplitude transition + should check itself if it or a parity partner is already defined. If so + a coupled coefficient is introduced. + """ + suffix = self._name_generator.generate_sequential_amplitude_suffix( + transition + ) + coefficient_symbol = sp.Symbol(f"C_{{{suffix}}}") + self.__parameter_defaults[coefficient_symbol] = complex(1, 0) + return coefficient_symbol + + def __generate_amplitude_prefactor( + self, transition: StateTransition + ) -> Optional[float]: + prefactor = get_prefactor(transition) + if prefactor != 1.0: + for node_id in transition.topology.nodes: + raw_suffix = self._name_generator.generate_coefficient_name( + transition, node_id + ) + if ( + raw_suffix + in self._name_generator.parity_partner_coefficient_mapping + ): + coefficient_suffix = self._name_generator.parity_partner_coefficient_mapping[ + raw_suffix + ] + if coefficient_suffix != raw_suffix: + return prefactor + return None + + +class CanonicalAmplitudeBuilder(HelicityAmplitudeBuilder): + r"""Amplitude model generator for the canonical helicity formalism. + + This class defines a full amplitude in the canonical formalism, using the + helicity formalism as a foundation. The key here is that we take the full + helicity intensity as a template, and just exchange the helicity amplitudes + :math:`F` as a sum of canonical amplitudes :math:`A`: + + .. math:: + + F^J_{\lambda_1,\lambda_2} = \sum_{LS} \mathrm{norm}(A^J_{LS})C^2. + + Here, :math:`C` stands for `Clebsch-Gordan factor + `_. + """ + + def __init__(self, reaction_result: ReactionInfo) -> None: + super().__init__(reaction_result) + self._name_generator = CanonicalAmplitudeNameGenerator() + + def _formulate_partial_decay( + self, transition: StateTransition, node_id: int + ) -> sp.Expr: + amplitude = super()._formulate_partial_decay(transition, node_id) + cg_coefficients = formulate_clebsch_gordan_coefficients( + transition, node_id + ) + return cg_coefficients * amplitude + + +def extract_particle_collection( + transitions: Iterable[StateTransition], +) -> ParticleCollection: + """Collect all particles from a collection of state transitions.""" + particles = ParticleCollection() + for transition in transitions: + for state in transition.states.values(): + if state.particle not in particles: + particles.add(state.particle) + return particles + + +def formulate_clebsch_gordan_coefficients( + transition: StateTransition, node_id: int +) -> sp.Expr: + """Compute two Clebsch-Gordan coefficients for a state transition node. + + >>> import qrules + >>> reaction = qrules.generate_transitions( + ... initial_state=[("J/psi(1S)", [+1])], + ... final_state=[("gamma", [+1]), "f(0)(980)"], + ... ) + >>> transition = reaction.transitions[0] + >>> formulate_clebsch_gordan_coefficients(transition, node_id=0) + CG(0, 0, 1, 1, 1, 1)*CG(1, 1, 0, 0, 1, 1) + + .. seealso:: :doc:`sympy:modules/physics/quantum/cg` + """ + decay = TwoBodyDecay.from_transition(transition, node_id) + + angular_momentum = get_angular_momentum(decay.interaction) + coupled_spin = get_coupled_spin(decay.interaction) + if angular_momentum.projection != 0.0: + raise ValueError(f"Projection of L is non-zero!: {angular_momentum}") + + parent = decay.parent + child1 = decay.children[0] + child2 = decay.children[1] + + decay_particle_lambda = child1.spin_projection - child2.spin_projection + cg_ls = CG( + j1=sp.Rational(angular_momentum.magnitude), + m1=sp.Rational(angular_momentum.projection), + j2=sp.Rational(coupled_spin.magnitude), + m2=sp.Rational(decay_particle_lambda), + j3=sp.Rational(parent.particle.spin), + m3=sp.Rational(decay_particle_lambda), + ) + cg_ss = CG( + j1=sp.Rational(child1.particle.spin), + m1=sp.Rational(child1.spin_projection), + j2=sp.Rational(child2.particle.spin), + m2=sp.Rational(-child2.spin_projection), + j3=sp.Rational(coupled_spin.magnitude), + m3=sp.Rational(decay_particle_lambda), + ) + return sp.Mul(cg_ls, cg_ss, evaluate=False) + + +def formulate_wigner_d(transition: StateTransition, node_id: int) -> sp.Expr: + """Compute `~sympy.physics.quantum.spin.WignerD` for a state transition node. + + >>> import qrules + >>> reaction = qrules.generate_transitions( + ... initial_state=[("J/psi(1S)", [+1])], + ... final_state=[("gamma", [+1]), "f(0)(980)"], + ... ) + >>> transition = reaction.transitions[0] + >>> formulate_wigner_d(transition, node_id=0) + WignerD(1, 1, 1, -phi_0, theta_0, 0) + """ + decay = TwoBodyDecay.from_transition(transition, node_id) + _, phi, theta = _generate_kinematic_variables(transition, node_id) + return Wigner.D( + j=sp.Rational(decay.parent.particle.spin), + m=sp.Rational(decay.parent.spin_projection), + mp=sp.Rational( + decay.children[0].spin_projection + - decay.children[1].spin_projection + ), + alpha=-phi, + beta=theta, + gamma=0, + ) + + +def get_prefactor(transition: StateTransition) -> float: + """Calculate the product of all prefactors defined in this transition.""" + prefactor = 1.0 + for node_id in transition.topology.nodes: + interaction = transition.interactions[node_id] + if interaction and interaction.parity_prefactor is not None: + prefactor *= interaction.parity_prefactor + return prefactor + + +def group_transitions( + transitions: Iterable[StateTransition], +) -> List[List[StateTransition]]: + """Match final and initial states in groups. + + Each `~qrules.transition.StateTransition` corresponds to a specific state + transition amplitude. This function groups together transitions, which have + the same initial and final state (including spin). This is needed to + determine the coherency of the individual amplitude parts. + """ + transition_groups: DefaultDict[ + Tuple[ + Tuple[Tuple[str, float], ...], + Tuple[Tuple[str, float], ...], + ], + List[StateTransition], + ] = defaultdict(list) + for transition in transitions: + initial_state = sorted( + ( + transition.states[i].particle.name, + transition.states[i].spin_projection, + ) + for i in transition.topology.incoming_edge_ids + ) + final_state = sorted( + ( + transition.states[i].particle.name, + transition.states[i].spin_projection, + ) + for i in transition.topology.outgoing_edge_ids + ) + group_key = (tuple(initial_state), tuple(final_state)) + transition_groups[group_key].append(transition) + + return list(transition_groups.values()) + + +def _generate_kinematic_variable_set( + transition: StateTransition, node_id: int +) -> TwoBodyKinematicVariableSet: + decay = TwoBodyDecay.from_transition(transition, node_id) + inv_mass, phi, theta = _generate_kinematic_variables(transition, node_id) + child1_mass = sp.Symbol( + get_invariant_mass_label(transition.topology, decay.children[0].id), + real=True, + ) + child2_mass = sp.Symbol( + get_invariant_mass_label(transition.topology, decay.children[1].id), + real=True, + ) + return TwoBodyKinematicVariableSet( + incoming_state_mass=inv_mass, + outgoing_state_mass1=child1_mass, + outgoing_state_mass2=child2_mass, + helicity_theta=theta, + helicity_phi=phi, + angular_momentum=decay.extract_angular_momentum(), + ) + + +def _generate_kinematic_variables( + transition: StateTransition, node_id: int +) -> Tuple[sp.Symbol, sp.Symbol, sp.Symbol]: + """Generate symbol for invariant mass, phi angle, and theta angle.""" + decay = TwoBodyDecay.from_transition(transition, node_id) + phi_label, theta_label = get_helicity_angle_label( + transition.topology, decay.children[0].id + ) + inv_mass_label = get_invariant_mass_label( + transition.topology, decay.parent.id + ) + return ( + sp.Symbol(inv_mass_label, real=True), + sp.Symbol(phi_label, real=True), + sp.Symbol(theta_label, real=True), + ) + + +# https://github.com/sympy/sympy/issues/21001 +# pylint: disable=protected-access, unused-argument +def _latex_fix(self: Type[CG], printer: LatexPrinter, *args: Any) -> str: + j3, m3, j1, m1, j2, m2 = map( + printer._print, + (self.j3, self.m3, self.j1, self.m1, self.j2, self.m2), + ) + return f"{{C^{{{j3},{m3}}}_{{{j1},{m1},{j2},{m2}}}}}" + + +CG._latex = _latex_fix diff --git a/src/ampform/helicity/decay.py b/src/ampform/helicity/decay.py new file mode 100644 index 000000000..dbe9c191c --- /dev/null +++ b/src/ampform/helicity/decay.py @@ -0,0 +1,141 @@ +"""Extract two-body decay info from a `~qrules.transition.StateTransition`.""" + +from typing import Iterable, List, Tuple + +import attr +from qrules.particle import Spin +from qrules.quantum_numbers import InteractionProperties +from qrules.transition import State, StateTransition + +from ampform.kinematics import assert_two_body_decay + + +@attr.s(auto_attribs=True, frozen=True) +class StateWithID(State): + """Extension of `~qrules.transition.State` that embeds the state ID.""" + + id: int # noqa: A003 + + @classmethod + def from_transition( + cls, transition: StateTransition, state_id: int + ) -> "StateWithID": + state = transition.states[state_id] + return cls( + id=state_id, + particle=state.particle, + spin_projection=state.spin_projection, + ) + + +@attr.s(auto_attribs=True, frozen=True) +class TwoBodyDecay: + """Two-body sub-decay in a `~qrules.transition.StateTransition`. + + This container class ensures that: + + 1. a selected node in a `~qrules.transition.StateTransition` is indeed a + 1-to-2 body decay + + 2. its two `.children` are sorted by whether they decay further or not (see + `.get_helicity_angle_label`, `.formulate_wigner_d`, and + `.formulate_clebsch_gordan_coefficients`). + + 3. the `.TwoBodyDecay` is hashable, so that it can be used as a key (see + `.set_dynamics`.) + """ + + parent: StateWithID + children: Tuple[StateWithID, StateWithID] + interaction: InteractionProperties + + @classmethod + def from_transition( + cls, transition: StateTransition, node_id: int + ) -> "TwoBodyDecay": + topology = transition.topology + in_state_ids = topology.get_edge_ids_ingoing_to_node(node_id) + out_state_ids = topology.get_edge_ids_outgoing_from_node(node_id) + if len(in_state_ids) != 1 or len(out_state_ids) != 2: + raise ValueError( + f"Node {node_id} does not represent a 1-to-2 body decay!" + ) + + sorted_by_id = sorted(out_state_ids) + final_state_ids = [ + i for i in sorted_by_id if i in topology.outgoing_edge_ids + ] + intermediate_state_ids = [ + i for i in sorted_by_id if i in topology.intermediate_edge_ids + ] + sorted_by_ending = tuple(intermediate_state_ids + final_state_ids) + + ingoing_state_id = next(iter(in_state_ids)) + out_state_id1, out_state_id2, *_ = sorted_by_ending + return cls( + parent=StateWithID.from_transition(transition, ingoing_state_id), + children=( + StateWithID.from_transition(transition, out_state_id1), + StateWithID.from_transition(transition, out_state_id2), + ), + interaction=transition.interactions[node_id], + ) + + def extract_angular_momentum(self) -> int: + angular_momentum = self.interaction.l_magnitude + if angular_momentum is not None: + return angular_momentum + spin_magnitude = self.parent.particle.spin + if spin_magnitude.is_integer(): + return int(spin_magnitude) + raise ValueError( + f"Spin magnitude ({spin_magnitude}) of single particle state cannot be" + f" used as the angular momentum as it is not integral!" + ) + + +def get_angular_momentum(interaction: InteractionProperties) -> Spin: + l_magnitude = interaction.l_magnitude + l_projection = interaction.l_projection + if l_magnitude is None or l_projection is None: + raise TypeError( + "Angular momentum L not defined!", l_magnitude, l_projection + ) + return Spin(l_magnitude, l_projection) + + +def get_coupled_spin(interaction: InteractionProperties) -> Spin: + s_magnitude = interaction.s_magnitude + s_projection = interaction.s_projection + if s_magnitude is None or s_projection is None: + raise TypeError("Coupled spin S not defined!") + return Spin(s_magnitude, s_projection) + + +def get_helicity_info( + transition: StateTransition, node_id: int +) -> Tuple[State, Tuple[State, State]]: + """Extract in- and outgoing states for a two-body decay node.""" + assert_two_body_decay(transition.topology, node_id) + in_edge_ids = transition.topology.get_edge_ids_ingoing_to_node(node_id) + out_edge_ids = transition.topology.get_edge_ids_outgoing_from_node(node_id) + in_helicity_list = get_sorted_states(transition, in_edge_ids) + out_helicity_list = get_sorted_states(transition, out_edge_ids) + return ( + in_helicity_list[0], + (out_helicity_list[0], out_helicity_list[1]), + ) + + +def get_sorted_states( + transition: StateTransition, state_ids: Iterable[int] +) -> List[State]: + """Get a sorted list of `~qrules.transition.State` instances. + + In order to ensure correct naming of amplitude coefficients the list has to + be sorted by name. The same coefficient names have to be created for two + transitions that only differ from a kinematic standpoint (swapped external + edges). + """ + states = [transition.states[i] for i in state_ids] + return sorted(states, key=lambda s: s.particle.name) diff --git a/src/ampform/helicity/naming.py b/src/ampform/helicity/naming.py new file mode 100644 index 000000000..e1108d728 --- /dev/null +++ b/src/ampform/helicity/naming.py @@ -0,0 +1,234 @@ +"""Generate descriptions used in the `~ampform.helicity` formalism.""" + +from typing import Dict, List, Optional, Tuple + +import sympy as sp +from qrules.transition import State, StateTransition + +from .decay import ( + get_angular_momentum, + get_coupled_spin, + get_helicity_info, + get_sorted_states, +) + + +class HelicityAmplitudeNameGenerator: + def __init__(self) -> None: + self.parity_partner_coefficient_mapping: Dict[str, str] = {} + + def register_amplitude_coefficient_name( + self, transition: StateTransition + ) -> None: + for node_id in transition.topology.nodes: + ( + coefficient_suffix, + parity_partner_coefficient_suffix, + priority_partner_coefficient_suffix, + ) = self.__generate_amplitude_coefficient_couple( + transition, node_id + ) + + if transition.interactions[node_id].parity_prefactor is None: + continue + + if ( + coefficient_suffix + not in self.parity_partner_coefficient_mapping + ): + if ( + parity_partner_coefficient_suffix + in self.parity_partner_coefficient_mapping + ): + if ( + parity_partner_coefficient_suffix + == priority_partner_coefficient_suffix + ): + self.parity_partner_coefficient_mapping[ + coefficient_suffix + ] = parity_partner_coefficient_suffix + else: + self.parity_partner_coefficient_mapping[ + parity_partner_coefficient_suffix + ] = coefficient_suffix + self.parity_partner_coefficient_mapping[ + coefficient_suffix + ] = coefficient_suffix + + else: + # if neither this coefficient nor its partner are registered just add it + self.parity_partner_coefficient_mapping[ + coefficient_suffix + ] = coefficient_suffix + + def __generate_amplitude_coefficient_couple( + self, transition: StateTransition, node_id: int + ) -> Tuple[str, str, str]: + incoming_state, outgoing_states = get_helicity_info( + transition, node_id + ) + par_name_suffix = self.generate_coefficient_name(transition, node_id) + + pp_par_name_suffix = ( + _state_to_str(incoming_state, use_helicity=False) + + R" \to " + + " ".join( + _state_to_str(s, make_parity_partner=True) + for s in outgoing_states + ) + ) + + priority_name_suffix = par_name_suffix + if outgoing_states[0].spin_projection < 0 or ( + outgoing_states[0].spin_projection == 0 + and outgoing_states[1].spin_projection < 0 + ): + priority_name_suffix = pp_par_name_suffix + + return (par_name_suffix, pp_par_name_suffix, priority_name_suffix) + + def generate_amplitude_name( # pylint: disable=no-self-use + self, + transition: StateTransition, + node_id: Optional[int] = None, + ) -> str: + """Generates a unique name for the amplitude corresponding. + + That is, corresponging to the given + `~qrules.transition.StateTransition`. If ``node_id`` is given, it + generates a unique name for the partial amplitude corresponding to the + interaction node of the given `~qrules.transition.StateTransition`. + """ + name = "" + if node_id is None: + node_ids = transition.topology.nodes + else: + node_ids = frozenset({node_id}) + names: List[str] = [] + for i in node_ids: + incoming_state, outgoing_states = get_helicity_info(transition, i) + name = ( + _state_to_str(incoming_state) + + R" \to " + + " ".join(_state_to_str(s) for s in outgoing_states) + ) + names.append(name) + return "; ".join(names) + + def generate_coefficient_name( # pylint: disable=no-self-use + self, transition: StateTransition, node_id: int + ) -> str: + """Generate partial amplitude coefficient name suffix.""" + in_hel_info, out_hel_info = get_helicity_info(transition, node_id) + return ( + _state_to_str(in_hel_info, use_helicity=False) + + R" \to " + + " ".join(_state_to_str(s) for s in out_hel_info) + ) + + def generate_sequential_amplitude_suffix( + self, transition: StateTransition + ) -> str: + """Generate unique suffix for a sequential amplitude transition.""" + coefficient_names: List[str] = [] + for node_id in transition.topology.nodes: + suffix = self.generate_coefficient_name(transition, node_id) + if suffix in self.parity_partner_coefficient_mapping: + suffix = self.parity_partner_coefficient_mapping[suffix] + coefficient_names.append(suffix) + return "; ".join(coefficient_names) + + +class CanonicalAmplitudeNameGenerator(HelicityAmplitudeNameGenerator): + def generate_amplitude_name( + self, + transition: StateTransition, + node_id: Optional[int] = None, + ) -> str: + if isinstance(node_id, int): + node_ids = frozenset({node_id}) + else: + node_ids = transition.topology.nodes + names: List[str] = [] + for node in node_ids: + helicity_name = super().generate_amplitude_name(transition, node) + canonical_name = helicity_name.replace( + R" \to ", + self.__generate_ls_arrow(transition, node), + ) + names.append(canonical_name) + return "; ".join(names) + + def generate_coefficient_name( + self, transition: StateTransition, node_id: int + ) -> str: + incoming_state, outgoing_states = get_helicity_info( + transition, node_id + ) + return ( + _state_to_str(incoming_state, use_helicity=False) + + self.__generate_ls_arrow(transition, node_id) + + " ".join( + _state_to_str(s, use_helicity=False) for s in outgoing_states + ) + ) + + def __generate_ls_arrow( + self, transition: StateTransition, node_id: int + ) -> str: + angular_momentum, spin = self.__get_ls_coupling(transition, node_id) + return fR" \xrightarrow[S={spin}]{{L={angular_momentum}}} " + + @staticmethod + def __get_ls_coupling( + transition: StateTransition, node_id: int + ) -> Tuple[sp.Rational, sp.Rational]: + interaction = transition.interactions[node_id] + ang_orb_mom = sp.Rational(get_angular_momentum(interaction).magnitude) + spin = sp.Rational(get_coupled_spin(interaction).magnitude) + return ang_orb_mom, spin + + +def generate_transition_label(transition: StateTransition) -> str: + initial_state_ids = transition.topology.incoming_edge_ids + final_state_ids = transition.topology.outgoing_edge_ids + initial_states = get_sorted_states(transition, initial_state_ids) + final_states = get_sorted_states(transition, final_state_ids) + return ( + _state_to_str(initial_states[0]) + + R" \to " + + " ".join(_state_to_str(s) for s in final_states) + ) + + +def _state_to_str( + state: State, + use_helicity: bool = True, + make_parity_partner: bool = False, +) -> str: + if state.particle.latex is not None: + output_string = state.particle.latex + else: + output_string = state.particle.name + if use_helicity: + if make_parity_partner: + helicity = -1 * state.spin_projection + else: + helicity = state.spin_projection + helicity_str = _render_float(helicity) + output_string += f"_{{{helicity_str}}}" + return output_string + + +def _render_float(value: float) -> str: + """Render a `float` nicely as a string. + + >>> _render_float(-0.5) + '-1/2' + >>> _render_float(1) + '+1' + """ + rational = sp.Rational(value) + if value > 0: + return f"+{rational}" + return str(rational) diff --git a/src/ampform/kinematics.py b/src/ampform/kinematics.py index c764ea967..e558a3abe 100644 --- a/src/ampform/kinematics.py +++ b/src/ampform/kinematics.py @@ -2,7 +2,7 @@ """Kinematics of an amplitude model in the helicity formalism.""" import textwrap -from typing import Dict, Mapping, Set, Tuple +from typing import Dict, List, Mapping, Set, Tuple import attr import numpy as np @@ -11,10 +11,6 @@ from qrules.topology import Topology, create_isobar_topologies from qrules.transition import ReactionInfo, StateTransition -from ._transition_info import ( - assert_isobar_topology, - determine_attached_final_state, -) from .data import ( DataSet, EventCollection, @@ -328,3 +324,39 @@ def _compute_invariant_masses( name = get_invariant_mass_label(topology, state_id) invariant_masses[name] = values return DataSet(invariant_masses) + + +def assert_isobar_topology(topology: Topology) -> None: + for node_id in topology.nodes: + assert_two_body_decay(topology, node_id) + + +def assert_two_body_decay(topology: Topology, node_id: int) -> None: + parent_state_ids = topology.get_edge_ids_ingoing_to_node(node_id) + if len(parent_state_ids) != 1: + raise ValueError( + f"Node {node_id} has {len(parent_state_ids)} parent states," + " so this is not an isobar decay" + ) + child_state_ids = topology.get_edge_ids_outgoing_from_node(node_id) + if len(child_state_ids) != 2: + raise ValueError( + f"Node {node_id} decays to {len(child_state_ids)} states," + " so this is not an isobar decay" + ) + + +def determine_attached_final_state( + topology: Topology, state_id: int +) -> List[int]: + """Determine all final state particles of a transition. + + These are attached downward (forward in time) for a given edge (resembling + the root). + """ + edge = topology.edges[state_id] + if edge.ending_node_id is None: + return [state_id] + return sorted( + topology.get_originating_final_state_edge_ids(edge.ending_node_id) + ) diff --git a/tests/conftest.py b/tests/conftest.py index 915c6479f..2ec3f0746 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -46,7 +46,7 @@ def amplitude_model(reaction: ReactionInfo) -> Tuple[str, HelicityModel]: model_builder.set_dynamics( name, create_relativistic_breit_wigner_with_ff ) - model = model_builder.generate() + model = model_builder.formulate() return reaction.formalism, model diff --git a/tests/helicity/__init__.py b/tests/helicity/__init__.py new file mode 100644 index 000000000..948df262f --- /dev/null +++ b/tests/helicity/__init__.py @@ -0,0 +1 @@ +"""Required to set mypy options for the tests folder.""" diff --git a/tests/helicity/test_decay.py b/tests/helicity/test_decay.py new file mode 100644 index 000000000..31d6181d6 --- /dev/null +++ b/tests/helicity/test_decay.py @@ -0,0 +1,52 @@ +# pylint: disable=no-member, no-self-use +from typing import Optional + +import pytest +from qrules.particle import Particle +from qrules.quantum_numbers import InteractionProperties + +from ampform.helicity.decay import StateWithID, TwoBodyDecay + + +def _create_dummy_decay( + l_magnitude: Optional[int], spin_magnitude: float +) -> TwoBodyDecay: + dummy = Particle(name="dummy", pid=123, spin=spin_magnitude, mass=1.0) + return TwoBodyDecay( + parent=StateWithID( + id=0, particle=dummy, spin_projection=spin_magnitude + ), + children=( + StateWithID(id=1, particle=dummy, spin_projection=0.0), + StateWithID(id=2, particle=dummy, spin_projection=0.0), + ), + interaction=InteractionProperties(l_magnitude=l_magnitude), + ) + + +class TestTwoBodyDecay: + @pytest.mark.parametrize( + ("decay", "expected_l"), + [ + (_create_dummy_decay(1, 0.5), 1), + (_create_dummy_decay(0, 1.0), 0), + (_create_dummy_decay(2, 1.0), 2), + (_create_dummy_decay(None, 0.0), 0), + (_create_dummy_decay(None, 1.0), 1), + ], + ) + def test_extract_angular_momentum( + self, decay: TwoBodyDecay, expected_l: int + ): + assert expected_l == decay.extract_angular_momentum() + + @pytest.mark.parametrize( + "decay", + [ + _create_dummy_decay(None, 0.5), + _create_dummy_decay(None, 1.5), + ], + ) + def test_invalid_angular_momentum(self, decay: TwoBodyDecay): + with pytest.raises(ValueError, match="not integral"): + decay.extract_angular_momentum() diff --git a/tests/helicity/test_helicity.py b/tests/helicity/test_helicity.py new file mode 100644 index 000000000..b2a547d6f --- /dev/null +++ b/tests/helicity/test_helicity.py @@ -0,0 +1,106 @@ +# pylint: disable=no-member, no-self-use +import pytest +import sympy as sp +from qrules import ReactionInfo +from sympy import cos, sin, sqrt + +from ampform import get_builder +from ampform.helicity import ( + _generate_kinematic_variables, + formulate_wigner_d, + group_transitions, +) + + +class TestAmplitudeBuilder: + def test_formulate(self, reaction: ReactionInfo): + if reaction.formalism == "canonical-helicity": + n_amplitudes = 16 + n_parameters = 4 + else: + n_amplitudes = 8 + n_parameters = 2 + + model_builder = get_builder(reaction) + model = model_builder.formulate() + assert len(model.parameter_defaults) == n_parameters + assert len(model.components) == 4 + n_amplitudes + assert len(model.expression.free_symbols) == 4 + n_parameters + + no_dynamics: sp.Expr = model.expression.doit() + no_dynamics = no_dynamics.subs(model.parameter_defaults) + assert len(no_dynamics.free_symbols) == 1 + + existing_theta = next(iter(no_dynamics.free_symbols)) + theta = sp.Symbol("theta", real=True) + no_dynamics = no_dynamics.subs({existing_theta: theta}) + no_dynamics = no_dynamics.trigsimp() + + if reaction.formalism == "canonical-helicity": + assert ( + no_dynamics + == 0.8 * sqrt(10) * cos(theta) ** 2 + + 4.4 * cos(theta) ** 2 + + 0.8 * sqrt(10) + + 4.4 + ) + else: + assert no_dynamics == 8.0 - 4.0 * sin(theta) ** 2 + + +@pytest.mark.parametrize( + ("node_id", "mass", "phi", "theta"), + [ + (0, "m_012", "phi_1+2", "theta_1+2"), + (1, "m_12", "phi_1,1+2", "theta_1,1+2"), + ], +) +def test_generate_kinematic_variables( + reaction: ReactionInfo, + node_id: int, + mass: str, + phi: str, + theta: str, +): + for transition in reaction.transitions: + variables = _generate_kinematic_variables(transition, node_id) + assert variables[0].name == mass + assert variables[1].name == phi + assert variables[2].name == theta + + +@pytest.mark.parametrize( + ("transition", "node_id", "expected"), + [ + (0, 0, "WignerD(1, -1, 1, -phi_1+2, theta_1+2, 0)"), + (0, 1, "WignerD(0, 0, 0, -phi_1,1+2, theta_1,1+2, 0)"), + (1, 0, "WignerD(1, -1, -1, -phi_1+2, theta_1+2, 0)"), + (1, 1, "WignerD(0, 0, 0, -phi_1,1+2, theta_1,1+2, 0)"), + (2, 0, "WignerD(1, 1, 1, -phi_1+2, theta_1+2, 0)"), + (2, 1, "WignerD(0, 0, 0, -phi_1,1+2, theta_1,1+2, 0)"), + ], +) +def test_formulate_wigner_d( + reaction: ReactionInfo, transition: int, node_id: int, expected: str +): + if reaction.formalism == "canonical-helicity": + transition *= 2 + transitions = [ + t + for t in reaction.transitions + if t.states[3].particle.name == "f(0)(980)" + ] + some_transition = transitions[transition] + wigner_d = formulate_wigner_d(some_transition, node_id) + assert str(wigner_d) == expected + + +def test_group_transitions(reaction: ReactionInfo): + transition_groups = group_transitions(reaction.transitions) + assert len(transition_groups) == 4 + for group in transition_groups: + transition_iter = iter(group) + first_transition = next(transition_iter) + for transition in transition_iter: + assert transition.initial_states == first_transition.initial_states + assert transition.final_states == first_transition.final_states diff --git a/tests/helicity/test_naming.py b/tests/helicity/test_naming.py new file mode 100644 index 000000000..1ed688b34 --- /dev/null +++ b/tests/helicity/test_naming.py @@ -0,0 +1,14 @@ +from qrules import ReactionInfo + +from ampform.helicity.naming import _render_float, generate_transition_label + + +def test_generate_transition_label(reaction: ReactionInfo): + for transition in reaction.transitions: + label = generate_transition_label(transition) + jpsi_spin = _render_float(transition.states[-1].spin_projection) + gamma_spin = _render_float(transition.states[0].spin_projection) + assert label == ( + fR"J/\psi(1S)_{{{jpsi_spin}}} \to \gamma_{{{gamma_spin}}}" + R" \pi^{0}_{0} \pi^{0}_{0}" + ) diff --git a/tests/test_angular_distributions.py b/tests/test_angular_distributions.py index 701fcc79a..982615ed2 100644 --- a/tests/test_angular_distributions.py +++ b/tests/test_angular_distributions.py @@ -1,6 +1,4 @@ -# cspell:ignore nsimplify # pylint: disable=redefined-outer-name,no-self-use - from typing import Any, List, Optional, Sequence, Union import pytest @@ -23,19 +21,16 @@ def calculate_sympy_integral( intensity *= sp.sin(int_var) else: intensity *= jacobi_determinant + integral = sp.integrate( + intensity, + *( + (x, -sp.pi, sp.pi) if "phi" in x.name else (x, 0, sp.pi) + for x in integration_variables + ), + ) return sp.trigsimp( sp.nsimplify( - sp.re( - sp.integrate( - intensity, - *( - (x, -sp.pi, sp.pi) - if "phi" in x.name - else (x, 0, sp.pi) - for x in integration_variables - ), - ) - ).doit(), + sp.re(integral).doit(), rational=True, ) ) @@ -77,7 +72,7 @@ def sympy_model(self, particle_database: ParticleCollection) -> sp.Expr: particle_db=particles, ) - amplitude_model = get_builder(reaction).generate() + amplitude_model = get_builder(reaction).formulate() full_model = sp.simplify( amplitude_model.expression.subs(amplitude_model.parameter_defaults) .doit() @@ -161,7 +156,7 @@ def sympy_model(self) -> sp.Expr: allowed_interaction_types="strong", formalism="helicity", ) - amplitude_model = get_builder(reaction).generate() + amplitude_model = get_builder(reaction).formulate() coefficient = sp.Symbol( R"C_{D_{1}(2420)^{0} \to D^{*}(2010)^{+}_{0} \pi^{-}_{0}; " diff --git a/tests/test_angular_momentum.py b/tests/test_angular_momentum.py deleted file mode 100644 index eb685c7f5..000000000 --- a/tests/test_angular_momentum.py +++ /dev/null @@ -1,59 +0,0 @@ -from typing import Optional - -import pytest -from qrules.particle import Particle -from qrules.quantum_numbers import InteractionProperties -from qrules.topology import Edge, Topology -from qrules.transition import State, StateTransition - -from ampform.helicity import _extract_angular_momentum - - -def _create_dummy_transition( - l_magnitude: Optional[int], spin_magnitude: float -) -> StateTransition: - topology = Topology( - nodes={0}, # type: ignore - edges={ - 0: Edge(None, 0), - 1: Edge(0, None), - 2: Edge(0, None), - }, - ) - particle = Particle(name="dummy", pid=123, spin=spin_magnitude, mass=1.0) - state = State(particle, spin_magnitude) - return StateTransition( - topology, - interactions={0: InteractionProperties(l_magnitude=l_magnitude)}, - states={0: state, 1: state, 2: state}, - ) - - -@pytest.mark.parametrize( - ("transition", "expected_l"), - [ - (_create_dummy_transition(1, 0.5), 1), - (_create_dummy_transition(0, 1.0), 0), - (_create_dummy_transition(2, 1.0), 2), - (_create_dummy_transition(None, 0.0), 0), - (_create_dummy_transition(None, 1.0), 1), - ], -) -def test_extract_angular_momentum( - transition: StateTransition, expected_l: int -) -> None: - assert expected_l == _extract_angular_momentum(transition, 0) - - -@pytest.mark.parametrize( - "transition", - [ - _create_dummy_transition(None, 0.5), - _create_dummy_transition(None, 1.5), - ], -) -def test_invalid_angular_momentum( - transition: StateTransition, -) -> None: - with pytest.raises(ValueError, match="not integral"): - _extract_angular_momentum(transition, 0) diff --git a/tests/test_helicity.py b/tests/test_helicity.py deleted file mode 100644 index 469409436..000000000 --- a/tests/test_helicity.py +++ /dev/null @@ -1,39 +0,0 @@ -import sympy as sp -from qrules import ReactionInfo -from sympy import cos, sin, sqrt - -from ampform import get_builder - - -def test_generate(reaction: ReactionInfo): - if reaction.formalism == "canonical-helicity": - n_amplitudes = 16 - n_parameters = 4 - else: - n_amplitudes = 8 - n_parameters = 2 - - model = get_builder(reaction).generate() - assert len(model.parameter_defaults) == n_parameters - assert len(model.components) == 4 + n_amplitudes - assert len(model.expression.free_symbols) == 4 + n_parameters - - no_dynamics: sp.Expr = model.expression.doit() - no_dynamics = no_dynamics.subs(model.parameter_defaults) - assert len(no_dynamics.free_symbols) == 1 - - existing_theta = next(iter(no_dynamics.free_symbols)) - theta = sp.Symbol("theta", real=True) - no_dynamics = no_dynamics.subs({existing_theta: theta}) - no_dynamics = no_dynamics.trigsimp() - - if reaction.formalism == "canonical-helicity": - assert ( - no_dynamics - == 0.8 * sqrt(10) * cos(theta) ** 2 - + 4.4 * cos(theta) ** 2 - + 0.8 * sqrt(10) - + 4.4 - ) - else: - assert no_dynamics == 8.0 - 4.0 * sin(theta) ** 2 diff --git a/tests/test_kinematics.py b/tests/test_kinematics.py index fa5948a98..ea2fb3e9e 100644 --- a/tests/test_kinematics.py +++ b/tests/test_kinematics.py @@ -8,6 +8,7 @@ from ampform.kinematics import ( _compute_helicity_angles, _compute_invariant_masses, + determine_attached_final_state, ) @@ -137,3 +138,22 @@ def test_compute_invariant_masses(data_sample: EventCollection): pytest.approx(invariant_masses["m_23"]) == data_sample.sum([2, 3]).mass() ) + + +def test_determine_attached_final_state(): + topologies = create_isobar_topologies(4) + # outer states + for topology in topologies: + for i in topology.outgoing_edge_ids: + assert determine_attached_final_state(topology, state_id=i) == [i] + for i in topology.incoming_edge_ids: + assert determine_attached_final_state( + topology, state_id=i + ) == list(topology.outgoing_edge_ids) + # intermediate states + topology = topologies[0] + assert determine_attached_final_state(topology, state_id=4) == [0, 1] + assert determine_attached_final_state(topology, state_id=5) == [2, 3] + topology = topologies[1] + assert determine_attached_final_state(topology, state_id=4) == [1, 2, 3] + assert determine_attached_final_state(topology, state_id=5) == [2, 3] diff --git a/tests/test_parity_prefactor.py b/tests/test_parity_prefactor.py index a97b07519..273f857f9 100644 --- a/tests/test_parity_prefactor.py +++ b/tests/test_parity_prefactor.py @@ -56,5 +56,5 @@ def test_parity_amplitude_coupling(test_input: Input, n_parameters: int): reaction = stm.find_solutions(problem_sets) model_builder = get_builder(reaction) - amplitude_model = model_builder.generate() + amplitude_model = model_builder.formulate() assert len(amplitude_model.parameter_defaults) == n_parameters diff --git a/tests/test_transition_info.py b/tests/test_transition_info.py deleted file mode 100644 index 6a909e26a..000000000 --- a/tests/test_transition_info.py +++ /dev/null @@ -1,22 +0,0 @@ -from qrules.topology import create_isobar_topologies - -from ampform._transition_info import determine_attached_final_state - - -def test_determine_attached_final_state(): - topologies = create_isobar_topologies(4) - # outer states - for topology in topologies: - for i in topology.outgoing_edge_ids: - assert determine_attached_final_state(topology, state_id=i) == [i] - for i in topology.incoming_edge_ids: - assert determine_attached_final_state( - topology, state_id=i - ) == list(topology.outgoing_edge_ids) - # intermediate states - topology = topologies[0] - assert determine_attached_final_state(topology, state_id=4) == [0, 1] - assert determine_attached_final_state(topology, state_id=5) == [2, 3] - topology = topologies[1] - assert determine_attached_final_state(topology, state_id=4) == [1, 2, 3] - assert determine_attached_final_state(topology, state_id=5) == [2, 3]