diff --git a/requirements-dev.txt b/requirements-dev.txt index 647280ff..4f090b4c 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -8,3 +8,4 @@ types-PyYAML~=6.0.12.12 antlr4-tools~=0.2.1 pandas~=2.0.3 pandas-stubs<=2.0.3 +types-PyYAML~=6.0.12 diff --git a/src/andromede/expression/__init__.py b/src/andromede/expression/__init__.py index 70c8c8ac..5b62b67e 100644 --- a/src/andromede/expression/__init__.py +++ b/src/andromede/expression/__init__.py @@ -11,7 +11,6 @@ # This file is part of the Antares project. from .copy import CopyVisitor, copy_expression -from .degree import ExpressionDegreeVisitor, compute_degree from .evaluate_parameters_efficient import ValueProvider from .expression_efficient import ( AdditionNode, diff --git a/src/andromede/expression/degree.py b/src/andromede/expression/degree.py deleted file mode 100644 index 3a5119ac..00000000 --- a/src/andromede/expression/degree.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (c) 2024, RTE (https://www.rte-france.com) -# -# See AUTHORS.txt -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. -# -# SPDX-License-Identifier: MPL-2.0 -# -# This file is part of the Antares project. - -import andromede.expression.scenario_operator - -from .expression_efficient import ( - AdditionNode, - ComparisonNode, - ComponentParameterNode, - DivisionNode, - ExpressionNodeEfficient, - LiteralNode, - MultiplicationNode, - NegationNode, - ParameterNode, - PortFieldAggregatorNode, - PortFieldNode, - ScenarioOperatorNode, - SubstractionNode, - TimeAggregatorName, - TimeAggregatorNode, - TimeOperatorName, - TimeOperatorNode, -) -from .visitor import ExpressionVisitor, T, visit - - -class ExpressionDegreeVisitor(ExpressionVisitor[int]): - """ - Computes degree of expression with respect to variables. - """ - - def literal(self, node: LiteralNode) -> int: - return 0 - - def negation(self, node: NegationNode) -> int: - return visit(node.operand, self) - - # TODO: Take into account simplification that can occur with literal coefficient for add, sub, mult, div - def addition(self, node: AdditionNode) -> int: - return max(visit(node.left, self), visit(node.right, self)) - - def substraction(self, node: SubstractionNode) -> int: - return max(visit(node.left, self), visit(node.right, self)) - - def multiplication(self, node: MultiplicationNode) -> int: - return visit(node.left, self) + visit(node.right, self) - - def division(self, node: DivisionNode) -> int: - right_degree = visit(node.right, self) - if right_degree != 0: - raise ValueError("Degree computation not implemented for divisions.") - return visit(node.left, self) - - def comparison(self, node: ComparisonNode) -> int: - return max(visit(node.left, self), visit(node.right, self)) - - # def variable(self, node: VariableNode) -> int: - # return 1 - - def parameter(self, node: ParameterNode) -> int: - return 0 - - # def comp_variable(self, node: ComponentVariableNode) -> int: - # return 1 - - def comp_parameter(self, node: ComponentParameterNode) -> int: - return 0 - - def time_operator(self, node: TimeOperatorNode) -> int: - if node.name in [TimeOperatorName.SHIFT, TimeOperatorName.EVALUATION]: - return visit(node.operand, self) - else: - return NotImplemented - - def time_aggregator(self, node: TimeAggregatorNode) -> int: - if node.name in [TimeAggregatorName.TIME_SUM]: - return visit(node.operand, self) - else: - return NotImplemented - - def scenario_operator(self, node: ScenarioOperatorNode) -> int: - scenario_operator_cls = getattr( - andromede.expression.scenario_operator, node.name - ) - # TODO: Carefully check if this formula is correct - return scenario_operator_cls.degree() * visit(node.operand, self) - - def port_field(self, node: PortFieldNode) -> int: - return 1 - - def port_field_aggregator(self, node: PortFieldAggregatorNode) -> int: - return visit(node.operand, self) - - -def compute_degree(expression: ExpressionNodeEfficient) -> int: - return visit(expression, ExpressionDegreeVisitor()) - - -def is_constant(expr: ExpressionNodeEfficient) -> bool: - """ - True if the expression has no variable. - """ - return compute_degree(expr) == 0 - - -def is_linear(expr: ExpressionNodeEfficient) -> bool: - """ - True if the expression is linear with respect to variables. - """ - return compute_degree(expr) <= 1 diff --git a/src/andromede/expression/evaluate.py b/src/andromede/expression/evaluate.py index 9f091350..94c9eb1d 100644 --- a/src/andromede/expression/evaluate.py +++ b/src/andromede/expression/evaluate.py @@ -13,20 +13,7 @@ from dataclasses import dataclass, field from typing import Dict -from .expression_efficient import ( - ComparisonNode, - ComponentParameterNode, - ExpressionNodeEfficient, - LiteralNode, - ParameterNode, - PortFieldAggregatorNode, - PortFieldNode, - ScenarioOperatorNode, - TimeAggregatorNode, - TimeOperatorNode, -) from .value_provider import TimeScenarioIndex, TimeScenarioIndices, ValueProvider -from .visitor import ExpressionVisitorOperations, visit # Used only for tests @@ -70,66 +57,3 @@ def block_length() -> int: @staticmethod def scenarios() -> int: raise NotImplementedError() - - -@dataclass(frozen=True) -class EvaluationVisitor(ExpressionVisitorOperations[float]): - """ - Evaluates the expression with respect to the provided context - (variables and parameters values). - """ - - context: ValueProvider - - def literal(self, node: LiteralNode) -> float: - return node.value - - def comparison(self, node: ComparisonNode) -> float: - raise ValueError("Cannot evaluate comparison operator.") - - def parameter(self, node: ParameterNode) -> float: - return self.context.get_parameter_value(node.name) - - def comp_parameter(self, node: ComponentParameterNode) -> float: - return self.context.get_component_parameter_value(node.component_id, node.name) - - def time_operator(self, node: TimeOperatorNode) -> float: - raise NotImplementedError() - - def time_aggregator(self, node: TimeAggregatorNode) -> float: - raise NotImplementedError() - - def scenario_operator(self, node: ScenarioOperatorNode) -> float: - raise NotImplementedError() - - def port_field(self, node: PortFieldNode) -> float: - raise NotImplementedError() - - def port_field_aggregator(self, node: PortFieldAggregatorNode) -> float: - raise NotImplementedError() - - -def evaluate( - expression: ExpressionNodeEfficient, value_provider: ValueProvider -) -> float: - return visit(expression, EvaluationVisitor(value_provider)) - - -@dataclass(frozen=True) -class InstancesIndexVisitor(EvaluationVisitor): - """ - Evaluates an expression given as instances index which should have no variable and constant parameter values. - """ - - def parameter(self, node: ParameterNode) -> float: - if not self.context.parameter_is_constant_over_time(node.name): - raise ValueError( - "Parameter given in an instance index expression must be constant over time" - ) - return self.context.get_parameter_value(node.name) - - def time_operator(self, node: TimeOperatorNode) -> float: - raise ValueError("An instance index expression cannot contain time operator") - - def time_aggregator(self, node: TimeAggregatorNode) -> float: - raise ValueError("An instance index expression cannot contain time aggregator") diff --git a/src/andromede/expression/expression_efficient.py b/src/andromede/expression/expression_efficient.py index 29bbe87b..4386b6d6 100644 --- a/src/andromede/expression/expression_efficient.py +++ b/src/andromede/expression/expression_efficient.py @@ -132,7 +132,7 @@ def expec(self) -> "ExpressionNodeEfficient": def variance(self) -> "ExpressionNodeEfficient": return _apply_if_node( - self, lambda x: ScenarioOperatorNode(x, ScenarioOperatorName.Variance) + self, lambda x: ScenarioOperatorNode(x, ScenarioOperatorName.VARIANCE) ) @@ -141,6 +141,8 @@ def wrap_in_node(obj: Any) -> ExpressionNodeEfficient: return obj elif isinstance(obj, float) or isinstance(obj, int): return LiteralNode(float(obj)) + # else: + # return None # Do not raise excpetion so that we can return NotImplemented in _apply_if_node # raise TypeError(f"Unable to wrap {obj} into an expression node") diff --git a/src/andromede/expression/linear_expression_efficient.py b/src/andromede/expression/linear_expression_efficient.py index e2e267e9..3ecc136c 100644 --- a/src/andromede/expression/linear_expression_efficient.py +++ b/src/andromede/expression/linear_expression_efficient.py @@ -21,6 +21,8 @@ Callable, Dict, List, + Literal, + Mapping, Optional, Sequence, TypeVar, @@ -31,10 +33,7 @@ from .context_adder import add_component_context from .equality import expressions_equal -from .evaluate_parameters_efficient import ( - check_resolved_expr, - resolve_coefficient, -) +from .evaluate_parameters_efficient import check_resolved_expr, resolve_coefficient from .expression_efficient import ( ExpressionNodeEfficient, ExpressionRange, @@ -65,11 +64,7 @@ TimeShift, TimeSum, ) -from .value_provider import ( - TimeScenarioIndex, - TimeScenarioIndices, - ValueProvider, -) +from .value_provider import TimeScenarioIndex, TimeScenarioIndices, ValueProvider @dataclass(frozen=True) @@ -368,7 +363,7 @@ def __str__(self) -> str: result += f".{str(self.aggregator)}" return result - def sum_connections(self) -> "LinearExpressionEfficient": + def sum_connections(self) -> "PortFieldTerm": if self.aggregator is not None: raise ValueError(f"Port field {str(self)} already has a port aggregator") return dataclasses.replace(self, aggregator=PortSum()) @@ -377,6 +372,10 @@ def sum_connections(self) -> "LinearExpressionEfficient": T_val = TypeVar("T_val", bound=Union[TermEfficient, PortFieldTerm]) +def _get_neutral_term(term: T_val, neutral: float) -> T_val: + return dataclasses.replace(term, coefficient=wrap_in_node(neutral)) + + @overload def _merge_dicts( lhs: Dict[TermKeyEfficient, TermEfficient], @@ -397,10 +396,6 @@ def _merge_dicts( ... -def _get_neutral_term(term: T_val, neutral: float) -> T_val: - return dataclasses.replace(term, coefficient=neutral) - - def _merge_dicts(lhs, rhs, merge_func, neutral): res = {} for k, v in lhs.items(): @@ -821,7 +816,7 @@ def sum( def _apply_operator( self, - sum_args: Dict[ + sum_args: Mapping[ str, Union[ int, @@ -839,13 +834,6 @@ def _apply_operator( return result_terms - # def sum_connections(self) -> "ExpressionNode": - # if isinstance(self, PortFieldNode): - # return PortFieldAggregatorNode(self, aggregator=PortFieldAggregatorName.PORT_SUM) - # raise ValueError( - # f"sum_connections() applies only for PortFieldNode, whereas the current node is of type {type(self)}." - # ) - def shift( self, expressions: Union[ @@ -1036,7 +1024,7 @@ def linear_expressions_equal_if_present( # TODO: Is this function useful ? Could we just rely on the sum operator overloading ? Only the case with an empty list may make the function useful def sum_expressions( expressions: Sequence[LinearExpressionEfficient], -) -> LinearExpressionEfficient: +) -> Union[LinearExpressionEfficient, Literal[0]]: if len(expressions) == 0: return wrap_in_linear_expr(literal(0)) else: @@ -1059,7 +1047,7 @@ def __post_init__( for bound in [self.lower_bound, self.upper_bound]: if not bound.is_constant(): raise ValueError( - f"The bounds of a constraint should not contain variables, {print_expr(bound)} was given." + f"The bounds of a constraint should not contain variables, {str(bound)} was given." ) def __str__(self) -> str: diff --git a/src/andromede/expression/port_operator.py b/src/andromede/expression/port_operator.py index 56f18322..845ae693 100644 --- a/src/andromede/expression/port_operator.py +++ b/src/andromede/expression/port_operator.py @@ -30,5 +30,5 @@ class PortAggregator: @dataclass(frozen=True) class PortSum(PortAggregator): - def __str__(self): + def __str__(self) -> str: return "PortSum" diff --git a/src/andromede/expression/time_operator.py b/src/andromede/expression/time_operator.py index 3b8e7bce..1332b2dc 100644 --- a/src/andromede/expression/time_operator.py +++ b/src/andromede/expression/time_operator.py @@ -36,12 +36,9 @@ class TimeOperator(ABC): def rolling(cls) -> bool: raise NotImplementedError - def key(self) -> Tuple[int, ...]: + def key(self) -> InstancesTimeIndex: return self.time_ids - def size(self) -> int: - return len(self.time_ids.expressions) - @dataclass(frozen=True) class TimeShift(TimeOperator): diff --git a/src/andromede/model/probability_law.py b/src/andromede/model/probability_law.py deleted file mode 100644 index 62e3dc6a..00000000 --- a/src/andromede/model/probability_law.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright (c) 2024, RTE (https://www.rte-france.com) -# -# See AUTHORS.txt -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. -# -# SPDX-License-Identifier: MPL-2.0 -# -# This file is part of the Antares project. - -""" -Describes probability distributions used in the models -""" - -from abc import ABC -from dataclasses import dataclass -from typing import List - -import numpy as np - -from andromede.expression.expression import ExpressionNode - - -class AbstractProbabilityLaw(ABC): - def get_sample(self, size: int) -> List[float]: - return NotImplemented - - -@dataclass(frozen=True) -class Normal(AbstractProbabilityLaw): - mean: ExpressionNode - standard_deviation: ExpressionNode - - def get_sample(self, size: int) -> List[float]: - return NotImplemented - - -@dataclass(frozen=True) -class Uniform(AbstractProbabilityLaw): - lower_bound: ExpressionNode - upper_bound: ExpressionNode - - def get_sample(self, size: int) -> List[float]: - return NotImplemented - - -@dataclass(frozen=True) -class UniformIntegers(AbstractProbabilityLaw): - lower_bound: ExpressionNode - upper_bound: ExpressionNode - - def get_sample(self, size: int) -> List[float]: - return NotImplemented diff --git a/src/andromede/model/variable.py b/src/andromede/model/variable.py index e418d8a3..28343e42 100644 --- a/src/andromede/model/variable.py +++ b/src/andromede/model/variable.py @@ -18,6 +18,7 @@ from andromede.expression.linear_expression_efficient import ( LinearExpressionEfficient, linear_expressions_equal_if_present, + wrap_in_linear_expr, wrap_in_linear_expr_if_present, ) from andromede.model.common import ( @@ -80,7 +81,14 @@ def bool_var( structure: IndexingStructure = IndexingStructure(True, True), context: ProblemContext = ProblemContext.OPERATIONAL, ) -> Variable: - return Variable(name, ValueType.BOOL, literal(0), literal(1), structure, context) + return Variable( + name, + ValueType.BOOL, + wrap_in_linear_expr(literal(0)), + wrap_in_linear_expr(literal(1)), + structure, + context, + ) def float_variable( diff --git a/src/andromede/simulation/linear_expression.py b/src/andromede/simulation/linear_expression.py deleted file mode 100644 index c491f6ce..00000000 --- a/src/andromede/simulation/linear_expression.py +++ /dev/null @@ -1,416 +0,0 @@ -# Copyright (c) 2024, RTE (https://www.rte-france.com) -# -# See AUTHORS.txt -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. -# -# SPDX-License-Identifier: MPL-2.0 -# -# This file is part of the Antares project. - -""" -Specific modelling for "instantiated" linear expressions, -with only variables and literal coefficients. -""" -from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional, TypeVar, Union - -from andromede.expression.indexing_structure import IndexingStructure -from andromede.expression.scenario_operator import ScenarioAggregator -from andromede.expression.time_operator import TimeAggregator, TimeOperator - -T = TypeVar("T") - -EPS = 10 ** (-16) - - -def is_close_abs(value: float, other_value: float, eps: float) -> bool: - return abs(value - other_value) < eps - - -def is_zero(value: float) -> bool: - return is_close_abs(value, 0, EPS) - - -def is_one(value: float) -> bool: - return is_close_abs(value, 1, EPS) - - -def is_minus_one(value: float) -> bool: - return is_close_abs(value, -1, EPS) - - -@dataclass(frozen=True) -class TermKey: - """ - Utility class to provide key for a term that contains all term information except coefficient - """ - - component_id: str - variable_name: str - time_operator: Optional[TimeOperator] - time_aggregator: Optional[TimeAggregator] - scenario_aggregator: Optional[ScenarioAggregator] - - -@dataclass(frozen=True) -class Term: - """ - One term in a linear expression: for example the "10x" par in "10x + 5y + 5" - - Args: - coefficient: the coefficient for that term, for example "10" in "10x" - variable_name: the name of the variable, for example "x" in "10x" - """ - - coefficient: float - component_id: str - variable_name: str - structure: IndexingStructure = field( - default=IndexingStructure(time=True, scenario=True) - ) - time_operator: Optional[TimeOperator] = None - time_aggregator: Optional[TimeAggregator] = None - scenario_aggregator: Optional[ScenarioAggregator] = None - - # TODO: It may be useful to define __add__, __sub__, etc on terms, which should return a linear expression ? - - def is_zero(self) -> bool: - return is_zero(self.coefficient) - - def str_for_coeff(self) -> str: - str_for_coeff = "" - if is_one(self.coefficient): - str_for_coeff = "+" - elif is_minus_one(self.coefficient): - str_for_coeff = "-" - else: - str_for_coeff = "{:+g}".format(self.coefficient) - return str_for_coeff - - def __str__(self) -> str: - # Useful for debugging tests - result = self.str_for_coeff() + str(self.variable_name) - if self.time_operator is not None: - result += f".{str(self.time_operator)}" - if self.time_aggregator is not None: - result += f".{str(self.time_aggregator)}" - if self.scenario_aggregator is not None: - result += f".{str(self.scenario_aggregator)}" - return result - - def number_of_instances(self) -> int: - if self.time_aggregator is not None: - return self.time_aggregator.size() - else: - if self.time_operator is not None: - return self.time_operator.size() - else: - return 1 - - -def generate_key(term: Term) -> TermKey: - return TermKey( - term.component_id, - term.variable_name, - term.time_operator, - term.time_aggregator, - term.scenario_aggregator, - ) - - -def _merge_dicts( - lhs: Dict[TermKey, Term], - rhs: Dict[TermKey, Term], - merge_func: Callable[[Term, Term], Term], - neutral: float, -) -> Dict[TermKey, Term]: - res = {} - for k, v in lhs.items(): - res[k] = merge_func( - v, - rhs.get( - k, - Term( - neutral, - v.component_id, - v.variable_name, - v.structure, - v.time_operator, - v.time_aggregator, - v.scenario_aggregator, - ), - ), - ) - for k, v in rhs.items(): - if k not in lhs: - res[k] = merge_func( - Term( - neutral, - v.component_id, - v.variable_name, - v.structure, - v.time_operator, - v.time_aggregator, - v.scenario_aggregator, - ), - v, - ) - return res - - -def _merge_is_possible(lhs: Term, rhs: Term) -> None: - if lhs.component_id != rhs.component_id or lhs.variable_name != rhs.variable_name: - raise ValueError("Cannot merge terms for different variables") - if ( - lhs.time_operator != rhs.time_operator - or lhs.time_aggregator != rhs.time_aggregator - or lhs.scenario_aggregator != rhs.scenario_aggregator - ): - raise ValueError("Cannot merge terms with different operators") - if lhs.structure != rhs.structure: - raise ValueError("Cannot merge terms with different structures") - - -def _add_terms(lhs: Term, rhs: Term) -> Term: - _merge_is_possible(lhs, rhs) - return Term( - lhs.coefficient + rhs.coefficient, - lhs.component_id, - lhs.variable_name, - lhs.structure, - lhs.time_operator, - lhs.time_aggregator, - lhs.scenario_aggregator, - ) - - -def _substract_terms(lhs: Term, rhs: Term) -> Term: - _merge_is_possible(lhs, rhs) - return Term( - lhs.coefficient - rhs.coefficient, - lhs.component_id, - lhs.variable_name, - lhs.structure, - lhs.time_operator, - lhs.time_aggregator, - lhs.scenario_aggregator, - ) - - -class LinearExpression: - """ - Represents a linear expression with respect to variable names, for example 10x + 5y + 2. - - Operators may be used for construction. - - Args: - terms: the list of variable terms, for example 10x and 5y in "10x + 5y + 2". - constant: the constant term, for example 2 in "10x + 5y + 2" - - Examples: - Operators may be used for construction: - - >>> LinearExpression([], 10) + LinearExpression([Term(10, "x")], 0) - LinearExpression([Term(10, "x")], 10) - """ - - terms: Dict[TermKey, Term] - constant: float - - def __init__( - self, - terms: Optional[Union[Dict[TermKey, Term], List[Term]]] = None, - constant: Optional[float] = None, - ) -> None: - self.constant = 0 - self.terms = {} - - if constant is not None: - # += b - self.constant = constant - if terms is not None: - # Allows to give two different syntax in the constructor: - # - List[Term] is natural - # - Dict[str, Term] is useful when constructing a linear expression from the terms of another expression - if isinstance(terms, dict): - for term_key, term in terms.items(): - if not term.is_zero(): - self.terms[term_key] = term - elif isinstance(terms, list): - for term in terms: - if not term.is_zero(): - self.terms[generate_key(term)] = term - else: - raise TypeError( - f"Terms must be either of type Dict[str, Term] or List[Term], whereas {terms} is of type {type(terms)}" - ) - - def is_zero(self) -> bool: - return len(self.terms) == 0 and is_zero(self.constant) - - def str_for_constant(self) -> str: - if is_zero(self.constant): - return "" - else: - return "{:+g}".format(self.constant) - - def __str__(self) -> str: - # Useful for debugging tests - result = "" - if self.is_zero(): - result += "0" - else: - for term in self.terms.values(): - result += str(term) - - result += self.str_for_constant() - - return result - - def __eq__(self, rhs: object) -> bool: - return ( - isinstance(rhs, LinearExpression) - and is_close_abs(self.constant, rhs.constant, EPS) - and self.terms - == rhs.terms # /!\ There may be float equality comparison in the terms values - ) - - def __iadd__(self, rhs: "LinearExpression") -> "LinearExpression": - if not isinstance(rhs, LinearExpression): - return NotImplemented - self.constant += rhs.constant - aggregated_terms = _merge_dicts(self.terms, rhs.terms, _add_terms, 0) - self.terms = aggregated_terms - self.remove_zeros_from_terms() - return self - - def __add__(self, rhs: "LinearExpression") -> "LinearExpression": - result = LinearExpression() - result += self - result += rhs - return result - - def __isub__(self, rhs: "LinearExpression") -> "LinearExpression": - if not isinstance(rhs, LinearExpression): - return NotImplemented - self.constant -= rhs.constant - aggregated_terms = _merge_dicts(self.terms, rhs.terms, _substract_terms, 0) - self.terms = aggregated_terms - self.remove_zeros_from_terms() - return self - - def __sub__(self, rhs: "LinearExpression") -> "LinearExpression": - result = LinearExpression() - result += self - result -= rhs - return result - - def __neg__(self) -> "LinearExpression": - result = LinearExpression() - result -= self - return result - - def __imul__(self, rhs: "LinearExpression") -> "LinearExpression": - if not isinstance(rhs, LinearExpression): - return NotImplemented - - if self.terms and rhs.terms: - raise ValueError("Cannot multiply two non constant expression") - else: - if self.terms: - left_expr = self - const_expr = rhs - else: - # It is possible that both expr are constant - left_expr = rhs - const_expr = self - if is_close_abs(const_expr.constant, 0, EPS): - return LinearExpression() - elif is_close_abs(const_expr.constant, 1, EPS): - _copy_expression(left_expr, self) - else: - left_expr.constant *= const_expr.constant - for term_key, term in left_expr.terms.items(): - left_expr.terms[term_key] = Term( - term.coefficient * const_expr.constant, - term.component_id, - term.variable_name, - term.structure, - term.time_operator, - term.time_aggregator, - term.scenario_aggregator, - ) - _copy_expression(left_expr, self) - return self - - def __mul__(self, rhs: "LinearExpression") -> "LinearExpression": - result = LinearExpression() - result += self - result *= rhs - return result - - def __itruediv__(self, rhs: "LinearExpression") -> "LinearExpression": - if not isinstance(rhs, LinearExpression): - return NotImplemented - - if rhs.terms: - raise ValueError("Cannot divide by a non constant expression") - else: - if is_close_abs(rhs.constant, 0, EPS): - raise ZeroDivisionError("Cannot divide expression by zero") - elif is_close_abs(rhs.constant, 1, EPS): - return self - else: - self.constant /= rhs.constant - for term_key, term in self.terms.items(): - self.terms[term_key] = Term( - term.coefficient / rhs.constant, - term.component_id, - term.variable_name, - term.structure, - term.time_operator, - term.time_aggregator, - term.scenario_aggregator, - ) - return self - - def __truediv__(self, rhs: "LinearExpression") -> "LinearExpression": - result = LinearExpression() - result += self - result /= rhs - - return result - - def remove_zeros_from_terms(self) -> None: - # TODO: Not optimized, checks could be done directly when doing operations on self.linear_term to avoid copies - for term_key, term in self.terms.copy().items(): - if is_close_abs(term.coefficient, 0, EPS): - del self.terms[term_key] - - def is_valid(self) -> bool: - nb_instances = None - for term in self.terms.values(): - term_instances = term.number_of_instances() - if nb_instances is None: - nb_instances = term_instances - else: - if term_instances != nb_instances: - raise ValueError( - "The terms of the linear expression {self} do not have the same number of instances" - ) - return True - - def number_of_instances(self) -> int: - if self.is_valid(): - # All terms have the same number of instances, just pick one - return self.terms[next(iter(self.terms))].number_of_instances() - else: - raise ValueError(f"{self} is not a valid linear expression") - - -def _copy_expression(src: LinearExpression, dst: LinearExpression) -> None: - dst.terms = src.terms - dst.constant = src.constant diff --git a/src/andromede/simulation/linear_expression_resolver.py b/src/andromede/simulation/linear_expression_resolver.py index ed28339f..0ae9c736 100644 --- a/src/andromede/simulation/linear_expression_resolver.py +++ b/src/andromede/simulation/linear_expression_resolver.py @@ -52,7 +52,7 @@ def resolve( resolved_variables = self.resolve_variables(term, row_id) # TODO: Contrary to the time aggregator that does a sum which is the default behaviour when append resolved terms, expectation performs an averaging, so weights must be included in coefficients. We feel here that we could generalize time and scenario aggregation over variables with more general operators, the following lines are very specific to expectation with same weights over all scenarios - weight = 1 + weight: float = 1 if isinstance(term.scenario_aggregator, Expectation): weight = 1 / self.value_provider.scenarios() diff --git a/src/andromede/simulation/optimization_context.py b/src/andromede/simulation/optimization_context.py index be7893cb..4c046e03 100644 --- a/src/andromede/simulation/optimization_context.py +++ b/src/andromede/simulation/optimization_context.py @@ -193,16 +193,6 @@ def register_connection_fields_expressions( ) -class TimestepValueProvider(ABC): - """ - Interface which provides numerical values for individual timesteps. - """ - - @abstractmethod - def get_value(self, block_timestep: int, scenario: int) -> float: - raise NotImplementedError() - - def _get_parameter_value( context: OptimizationContext, block_timestep: int, @@ -294,21 +284,6 @@ def scenarios() -> int: return Provider() -@dataclass(frozen=True) -class ExpressionTimestepValueProvider(TimestepValueProvider): - context: "OptimizationContext" - component: Component - expression: LinearExpressionEfficient - - # OptimizationContext has knowledge of the block, so that get_value only needs block_timestep and scenario to get the correct data value - - def get_value(self, block_timestep: int, scenario: int) -> float: - param_value_provider = make_value_provider( - self.context, block_timestep, scenario, self.component - ) - return self.expression.evaluate(param_value_provider) - - def make_data_structure_provider( network: Network, component: Component ) -> IndexingStructureProvider: @@ -345,16 +320,6 @@ class ComponentContext: opt_context: OptimizationContext component: Component - def get_values( - self, expression: LinearExpressionEfficient - ) -> TimestepValueProvider: - """ - The returned value provider will evaluate the provided expression. - """ - return ExpressionTimestepValueProvider( - self.opt_context, self.component, expression - ) - def add_variable( self, block_timestep: int, diff --git a/tests/unittests/expressions/test_linear_expressions.py b/tests/unittests/expressions/test_linear_expressions.py deleted file mode 100644 index 21f916a7..00000000 --- a/tests/unittests/expressions/test_linear_expressions.py +++ /dev/null @@ -1,446 +0,0 @@ -# Copyright (c) 2024, RTE (https://www.rte-france.com) -# -# See AUTHORS.txt -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. -# -# SPDX-License-Identifier: MPL-2.0 -# -# This file is part of the Antares project. - -from typing import Dict - -import pytest - -from andromede.expression.scenario_operator import Expectation -from andromede.expression.time_operator import TimeShift, TimeSum -from andromede.simulation.linear_expression import LinearExpression, Term, TermKey - - -@pytest.mark.parametrize( - "term, expected", - [ - (Term(1, "c", "x"), "+x"), - (Term(-1, "c", "x"), "-x"), - (Term(2.50, "c", "x"), "+2.5x"), - (Term(-3, "c", "x"), "-3x"), - (Term(-3, "c", "x", time_operator=TimeShift(-1)), "-3x.shift(-1)"), - (Term(-3, "c", "x", time_aggregator=TimeSum(True)), "-3x.sum(True)"), - ( - Term( - -3, - "c", - "x", - time_operator=TimeShift([2, 3]), - time_aggregator=TimeSum(False), - ), - "-3x.shift([2, 3]).sum(False)", - ), - (Term(-3, "c", "x", scenario_aggregator=Expectation()), "-3x.expec()"), - ( - Term( - -3, - "c", - "x", - time_aggregator=TimeSum(True), - scenario_aggregator=Expectation(), - ), - "-3x.sum(True).expec()", - ), - ], -) -def test_printing_term(term: Term, expected: str) -> None: - assert str(term) == expected - - -@pytest.mark.parametrize( - "coeff, var_name, constant, expec_str", - [ - (0, "x", 0, "0"), - (1, "x", 0, "+x"), - (1, "x", 1, "+x+1"), - (3.7, "x", 1, "+3.7x+1"), - (0, "x", 1, "+1"), - ], -) -def test_affine_expression_printing_should_reflect_required_formatting( - coeff: float, var_name: str, constant: float, expec_str: str -) -> None: - expr = LinearExpression([Term(coeff, "c", var_name)], constant) - assert str(expr) == expec_str - - -@pytest.mark.parametrize( - "lhs, rhs", - [ - (LinearExpression([], 1) + LinearExpression([], 3), LinearExpression([], 4)), - (LinearExpression([], 4) / LinearExpression([], 2), LinearExpression([], 2)), - (LinearExpression([], 4) * LinearExpression([], 2), LinearExpression([], 8)), - (LinearExpression([], 4) - LinearExpression([], 2), LinearExpression([], 2)), - ], -) -def test_constant_expressions(lhs: LinearExpression, rhs: LinearExpression) -> None: - assert lhs == rhs - - -@pytest.mark.parametrize( - "terms_dict, constant, exp_terms, exp_constant", - [ - ({"x": Term(0, "c", "x")}, 1, {}, 1), - ({"x": Term(1, "c", "x")}, 1, {"x": Term(1, "c", "x")}, 1), - ], -) -def test_instantiate_linear_expression_from_dict( - terms_dict: Dict[TermKey, Term], - constant: float, - exp_terms: Dict[str, Term], - exp_constant: float, -) -> None: - expr = LinearExpression(terms_dict, constant) - assert expr.terms == exp_terms - assert expr.constant == exp_constant - - -@pytest.mark.parametrize( - "e1, e2, expected", - [ - ( - LinearExpression([Term(10, "c", "x")], 1), - LinearExpression([Term(5, "c", "x")], 2), - LinearExpression([Term(15, "c", "x")], 3), - ), - ( - LinearExpression([Term(10, "c1", "x")], 1), - LinearExpression([Term(5, "c2", "x")], 2), - LinearExpression([Term(10, "c1", "x"), Term(5, "c2", "x")], 3), - ), - ( - LinearExpression([Term(10, "c", "x")], 0), - LinearExpression([Term(5, "c", "y")], 0), - LinearExpression([Term(10, "c", "x"), Term(5, "c", "y")], 0), - ), - ( - LinearExpression(), - LinearExpression([Term(10, "c", "x", TimeShift(-1))]), - LinearExpression([Term(10, "c", "x", TimeShift(-1))]), - ), - ( - LinearExpression(), - LinearExpression( - [Term(10, "c", "x", time_aggregator=TimeSum(stay_roll=True))] - ), - LinearExpression( - [Term(10, "c", "x", time_aggregator=TimeSum(stay_roll=True))] - ), - ), - ( - LinearExpression([Term(10, "c", "x")]), - LinearExpression([Term(10, "c", "x", time_operator=TimeShift(-1))]), - LinearExpression( - [Term(10, "c", "x"), Term(10, "c", "x", time_operator=TimeShift(-1))] - ), - ), - ( - LinearExpression([Term(10, "c", "x")]), - LinearExpression( - [ - Term( - 10, - "c", - "x", - time_operator=TimeShift(-1), - scenario_aggregator=Expectation(), - ) - ] - ), - LinearExpression( - [ - Term(10, "c", "x"), - Term( - 10, - "c", - "x", - time_operator=TimeShift(-1), - scenario_aggregator=Expectation(), - ), - ] - ), - ), - ], -) -def test_addition( - e1: LinearExpression, e2: LinearExpression, expected: LinearExpression -) -> None: - assert e1 + e2 == expected - - -def test_addition_of_linear_expressions_with_different_number_of_instances_should_raise_value_error() -> ( - None -): - pass - - -def test_operation_that_leads_to_term_with_zero_coefficient_should_be_removed_from_terms() -> ( - None -): - e1 = LinearExpression([Term(10, "c", "x")], 1) - e2 = LinearExpression([Term(10, "c", "x")], 2) - e3 = e2 - e1 - assert e3.terms == {} - - -@pytest.mark.parametrize( - "e1, e2, expected", - [ - ( - LinearExpression([Term(10, "c", "x")], 3), - LinearExpression([], 2), - LinearExpression([Term(20, "c", "x")], 6), - ), - ( - LinearExpression([Term(10, "c", "x")], 3), - LinearExpression([], 1), - LinearExpression([Term(10, "c", "x")], 3), - ), - ( - LinearExpression([Term(10, "c", "x")], 3), - LinearExpression(), - LinearExpression(), - ), - ( - LinearExpression( - [ - Term( - 10, - "c", - "x", - time_operator=TimeShift(-1), - scenario_aggregator=Expectation(), - ) - ], - 3, - ), - LinearExpression([], 2), - LinearExpression( - [ - Term( - 20, - "c", - "x", - time_operator=TimeShift(-1), - scenario_aggregator=Expectation(), - ) - ], - 6, - ), - ), - ], -) -def test_multiplication( - e1: LinearExpression, e2: LinearExpression, expected: LinearExpression -) -> None: - assert e1 * e2 == expected - assert e2 * e1 == expected - - -def test_multiplication_of_two_non_constant_terms_should_raise_value_error() -> None: - e1 = LinearExpression([Term(10, "c", "x")], 0) - e2 = LinearExpression([Term(5, "c", "x")], 0) - with pytest.raises(ValueError) as exc: - _ = e1 * e2 - assert str(exc.value) == "Cannot multiply two non constant expression" - - -@pytest.mark.parametrize( - "e1, expected", - [ - ( - LinearExpression([Term(10, "c", "x")], 5), - LinearExpression([Term(-10, "c", "x")], -5), - ), - ( - LinearExpression( - [ - Term( - 10, - "c", - "x", - time_operator=TimeShift(-1), - time_aggregator=TimeSum(False), - scenario_aggregator=Expectation(), - ) - ], - 5, - ), - LinearExpression( - [ - Term( - -10, - "c", - "x", - time_operator=TimeShift(-1), - time_aggregator=TimeSum(False), - scenario_aggregator=Expectation(), - ) - ], - -5, - ), - ), - ], -) -def test_negation(e1: LinearExpression, expected: LinearExpression) -> None: - assert -e1 == expected - - -@pytest.mark.parametrize( - "e1, e2, expected", - [ - ( - LinearExpression([Term(10, "c", "x")], 1), - LinearExpression([Term(5, "c", "x")], 2), - LinearExpression([Term(5, "c", "x")], -1), - ), - ( - LinearExpression([Term(10, "c1", "x")], 1), - LinearExpression([Term(5, "c2", "x")], 2), - LinearExpression([Term(10, "c1", "x"), Term(-5, "c2", "x")], -1), - ), - ( - LinearExpression([Term(10, "c", "x")], 0), - LinearExpression([Term(5, "c", "y")], 0), - LinearExpression([Term(10, "c", "x"), Term(-5, "c", "y")], 0), - ), - ( - LinearExpression(), - LinearExpression([Term(10, "c", "x", time_operator=TimeShift(-1))]), - LinearExpression([Term(-10, "c", "x", time_operator=TimeShift(-1))]), - ), - ( - LinearExpression(), - LinearExpression( - [Term(10, "c", "x", time_aggregator=TimeSum(stay_roll=True))] - ), - LinearExpression( - [Term(-10, "c", "x", time_aggregator=TimeSum(stay_roll=True))] - ), - ), - ( - LinearExpression([Term(10, "c", "x")]), - LinearExpression([Term(10, "c", "x", time_operator=TimeShift(-1))]), - LinearExpression( - [Term(10, "c", "x"), Term(-10, "c", "x", time_operator=TimeShift(-1))] - ), - ), - ( - LinearExpression([Term(10, "c", "x")]), - LinearExpression( - [ - Term( - 10, - "c", - "x", - time_operator=TimeShift(-1), - time_aggregator=TimeSum(False), - scenario_aggregator=Expectation(), - ) - ] - ), - LinearExpression( - [ - Term(10, "c", "x"), - Term( - -10, - "c", - "x", - time_operator=TimeShift(-1), - time_aggregator=TimeSum(False), - scenario_aggregator=Expectation(), - ), - ] - ), - ), - ], -) -def test_substraction( - e1: LinearExpression, e2: LinearExpression, expected: LinearExpression -) -> None: - assert e1 - e2 == expected - - -@pytest.mark.parametrize( - "e1, e2, expected", - [ - ( - LinearExpression([Term(10, "c", "x")], 15), - LinearExpression([], 5), - LinearExpression([Term(2, "c", "x")], 3), - ), - ( - LinearExpression([Term(10, "c", "x")], 15), - LinearExpression([], 1), - LinearExpression([Term(10, "c", "x")], 15), - ), - ( - LinearExpression( - [ - Term( - 10, - "c", - "x", - time_operator=TimeShift(-1), - time_aggregator=TimeSum(False), - scenario_aggregator=Expectation(), - ) - ], - 15, - ), - LinearExpression([], 5), - LinearExpression( - [ - Term( - 2, - "c", - "x", - time_operator=TimeShift(-1), - time_aggregator=TimeSum(False), - scenario_aggregator=Expectation(), - ) - ], - 3, - ), - ), - ], -) -def test_division( - e1: LinearExpression, e2: LinearExpression, expected: LinearExpression -) -> None: - assert e1 / e2 == expected - - -def test_division_by_zero_sould_raise_zero_division_error() -> None: - e1 = LinearExpression([Term(10, "c", "x")], 15) - e2 = LinearExpression() - with pytest.raises(ZeroDivisionError) as exc: - _ = e1 / e2 - assert str(exc.value) == "Cannot divide expression by zero" - - -def test_division_by_non_constant_expr_sould_raise_value_error() -> None: - e1 = LinearExpression([Term(10, "c", "x")], 15) - e2 = LinearExpression() - with pytest.raises(ValueError) as exc: - _ = e2 / e1 - assert str(exc.value) == "Cannot divide by a non constant expression" - - -def test_imul_preserve_identity() -> None: - # technical test to check the behaviour of reassigning "self" in imul operator: - # it did not preserve identity, which could lead to weird behaviour - e1 = LinearExpression([], 15) - e2 = e1 - e1 *= LinearExpression([], 2) - assert e1 == LinearExpression([], 30) - assert e2 == e1 - assert e2 is e1