diff --git a/pySTAR/__init__.py b/pySTAR/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pySTAR/bigm_operators.py b/pySTAR/bigm_operators.py new file mode 100644 index 0000000..78077b5 --- /dev/null +++ b/pySTAR/bigm_operators.py @@ -0,0 +1,584 @@ +""" +This module contains all the supported operator models +for symbolic regression. The operator constraints are implemented using +the big-m approach +""" + +import pandas as pd +from pyomo.core.base.block import BlockData, declare_custom_block +import pyomo.environ as pyo + + +@declare_custom_block("BaseOperator", rule="build") +class BaseOperatorData(BlockData): + """Operator template for big-m type constraints""" + + @property + def symbolic_regression_model(self): + """Returns a pointer to the symbolic regression model""" + # Model layout: SymbolicRegressionModel + # |___SampleBlock + # |___OperatorBlock + # Regression model is two levels up, so calling the + # parent_block method twice. The model is needed to access the + # operator binary variables. + return self.parent_block().parent_block() + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + """ + Builds the operator-specific constraints + + Parameters + ---------- + val_node : Var + Variables corresponding to the value at a node. It is an + indexed Var, where the index denotes the node number. + + op_bin_var : dict + Dictionary containing the binary variables associated with + the operator. Here, the keys correspond to node numbers, and the + values contain the operator binary variable. + """ + raise NotImplementedError("Operator-specific constraints are not implemented") + + def construct_convex_relaxation(self): + """Constructs a convex relaxation of the operator-specific constraints""" + raise NotImplementedError() + + @staticmethod + def compute_node_value(left_child: float, right_child: float): + """Computes the value at a node based on the operator""" + raise NotImplementedError() + + +@declare_custom_block("SumOperator", rule="build") +class SumOperatorData(BaseOperatorData): + """Adds constraints for the addition operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + + @self.Constraint(srm.non_terminal_nodes_set) + def upper_bound_constraint(_, n): + bigm = vub - 2 * vlb + return val_node[n] - (val_node[2 * n] + val_node[2 * n + 1]) <= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def lower_bound_constraint(_, n): + bigm = vlb - 2 * vub + return val_node[n] - (val_node[2 * n] + val_node[2 * n + 1]) >= bigm * ( + 1 - op_bin_var[n] + ) + + def construct_convex_relaxation(self): + # Constraints are convex, so returning the block as it is + pass + + @staticmethod + def compute_node_value(left_child, right_child): + return left_child + right_child + + +@declare_custom_block("DiffOperator", rule="build") +class DiffOperatorData(BaseOperatorData): + """Adds constraints for the difference operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + + @self.Constraint(srm.non_terminal_nodes_set) + def upper_bound_constraint(_, n): + bigm = 2 * vub - vlb + return val_node[n] - (val_node[2 * n] - val_node[2 * n + 1]) <= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def lower_bound_constraint(_, n): + bigm = 2 * vlb - vub + return val_node[n] - (val_node[2 * n] - val_node[2 * n + 1]) >= bigm * ( + 1 - op_bin_var[n] + ) + + def construct_convex_relaxation(self): + # Constraints are convex, so returning the block as it is + pass + + @staticmethod + def compute_node_value(left_child, right_child): + return left_child - right_child + + +@declare_custom_block("MultOperator", rule="build") +class MultOperatorData(BaseOperatorData): + """Adds constraints for the multiplication operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + product_values = [ + pyo.value(vlb * vlb), + pyo.value(vlb * vub), + pyo.value(vub * vub), + ] + + @self.Constraint(srm.non_terminal_nodes_set) + def upper_bound_constraint(_, n): + bigm = vub - min(product_values) + return val_node[n] - (val_node[2 * n] * val_node[2 * n + 1]) <= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def lower_bound_constraint(_, n): + bigm = vlb - max(product_values) + return val_node[n] - (val_node[2 * n] * val_node[2 * n + 1]) >= bigm * ( + 1 - op_bin_var[n] + ) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(left_child, right_child): + return left_child * right_child + + +@declare_custom_block("DivOperator", rule="build") +class DivOperatorData(BaseOperatorData): + """Adds constraints for the division operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + eps = srm.eps_value + product_values = [ + pyo.value(vlb * vlb), + pyo.value(vlb * vub), + pyo.value(vub * vub), + ] + + @self.Constraint(srm.non_terminal_nodes_set) + def upper_bound_constraint(_, n): + bigm = max(product_values) - vlb + return (val_node[n] * val_node[2 * n + 1]) - val_node[2 * n] <= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def lower_bound_constraint(_, n): + bigm = min(product_values) - vub + return (val_node[n] * val_node[2 * n + 1]) - val_node[2 * n] >= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def avoid_zero_constraint(_, n): + # Ensures that the denominator is away from zero + # return eps * op_bin_var[n] <= val_node[2 * n + 1] ** 2 + return val_node[2 * n + 1] ** 2 >= op_bin_var[n] - 1 + eps + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(left_child, right_child): + return left_child / right_child + + +@declare_custom_block("SqrtOperator", rule="build") +class SqrtOperatorData(BaseOperatorData): + """Adds constraints for the square root operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + + @self.Constraint(srm.non_terminal_nodes_set) + def upper_bound_constraint(_, n): + bigm = max(pyo.value(vlb * vlb), pyo.value(vub * vub)) - vlb + return val_node[n] ** 2 - val_node[2 * n + 1] <= bigm * (1 - op_bin_var[n]) + + @self.Constraint(srm.non_terminal_nodes_set) + def lower_bound_constraint(_, n): + bigm = -vub + return val_node[n] ** 2 - val_node[2 * n + 1] >= bigm * (1 - op_bin_var[n]) + + @self.Constraint(srm.non_terminal_nodes_set) + def non_negativity_constraint(_, n): + return val_node[2 * n + 1] >= (1 - op_bin_var[n]) * vlb + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.sqrt(right_child) + + +@declare_custom_block("ExpOldOperator", rule="build") +class ExpOldOperatorData(BaseOperatorData): + """Adds constraints for the exponential operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + + @self.Constraint(srm.non_terminal_nodes_set) + def upper_bound_constraint(_, n): + bigm = vub + return val_node[n] - pyo.exp(val_node[2 * n + 1]) <= bigm * ( + 1 - op_bin_var[n] + ) + + # NOTE: Would this constrain the value of v[2*n+1]? + if vub >= 10: + _bigm = vlb - 1e5 + else: + _bigm = vlb - pyo.exp(vub) + + @self.Constraint(srm.non_terminal_nodes_set) + def lower_bound_constraint(_, n): + return val_node[n] - pyo.exp(val_node[2 * n + 1]) >= _bigm * ( + 1 - op_bin_var[n] + ) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.exp(right_child) + + +@declare_custom_block("ExpCompOperator", rule="build") +class ExpCompOperatorData(BaseOperatorData): + """Adds constraints for the exponential operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + + @self.Constraint(srm.non_terminal_nodes_set) + def complementarity_constraint(_, n): + return op_bin_var[n] * (val_node[n] - pyo.exp(val_node[2 * n + 1])) == 0 + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.exp(right_child) + + +@declare_custom_block("ExpOperator", rule="build") +class ExpOperatorData(BaseOperatorData): + """Adds constraints for the exponential operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + + # pylint: disable = attribute-defined-outside-init + self.aux_var_exp = pyo.Var( + srm.non_terminal_nodes_set, + bounds=(max(-10, vlb), pyo.log(vub)), + doc="Auxiliary variable for modeling exp function", + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def func_upper_bound_constraint(blk, n): + bigm = vub - 0 + return val_node[n] - pyo.exp(blk.aux_var_exp[n]) <= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def func_lower_bound_constraint(blk, n): + bigm = vlb - vub + return val_node[n] - pyo.exp(blk.aux_var_exp[n]) >= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def var_upper_bound_constraint(blk, n): + bigm = vub - blk.aux_var_exp[n].lb + return val_node[2 * n + 1] - blk.aux_var_exp[n] <= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def var_lower_bound_constraint(blk, n): + bigm = vlb - blk.aux_var_exp[n].ub + return val_node[2 * n + 1] - blk.aux_var_exp[n] >= bigm * ( + 1 - op_bin_var[n] + ) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.exp(right_child) + + +@declare_custom_block("LogOldOperator", rule="build") +class LogOldOperatorData(BaseOperatorData): + """Adds constraints for the logarithm operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + eps = srm.eps_value + + # NOTE: Would this constrain the value of v[2*n+1]? + if vub >= 10: + _bigm = 1e5 - vlb + else: + _bigm = pyo.exp(vub) - vlb + + @self.Constraint(srm.non_terminal_nodes_set) + def upper_bound_constraint(_, n): + return pyo.exp(val_node[n]) - val_node[2 * n + 1] <= _bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def lower_bound_constraint(_, n): + bigm = -vub + return pyo.exp(val_node[n]) - val_node[2 * n + 1] >= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def avoid_zero_constraint(_, n): + # Ensures that the denominator is away from zero + # return eps * op_bin_var[n] <= val_node[2 * n + 1] ** 2 + return op_bin_var[n] * val_node[2 * n + 1] >= op_bin_var[n] - 1 + eps + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.log(right_child) + + +@declare_custom_block("LogCompOperator", rule="build") +class LogCompOperatorData(BaseOperatorData): + """Adds constraints for the exponential operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + + @self.Constraint(srm.non_terminal_nodes_set) + def complementarity_constraint(_, n): + # Need to transform the equation to avoid log domain error + return op_bin_var[n] * (pyo.exp(val_node[n]) - val_node[2 * n + 1]) == 0 + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.log(right_child) + + +@declare_custom_block("LogOperator", rule="build") +class LogOperatorData(BaseOperatorData): + """Adds constraints for the exponential operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + + # pylint: disable = attribute-defined-outside-init + self.aux_var_log = pyo.Var( + srm.non_terminal_nodes_set, + bounds=(srm.eps_value, vub), + doc="Auxiliary variable for modeling log function", + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def func_upper_bound_constraint(blk, n): + bigm = vub - pyo.log(blk.aux_var_log[n].lb) + return val_node[n] - pyo.log(blk.aux_var_log[n]) <= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def func_lower_bound_constraint(blk, n): + bigm = vlb - pyo.log(blk.aux_var_log[n].ub) + return val_node[n] - pyo.log(blk.aux_var_log[n]) >= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def var_upper_bound_constraint(blk, n): + bigm = vub - blk.aux_var_log[n].lb + return val_node[2 * n + 1] - blk.aux_var_log[n] <= bigm * ( + 1 - op_bin_var[n] + ) + + @self.Constraint(srm.non_terminal_nodes_set) + def var_lower_bound_constraint(blk, n): + bigm = vlb - blk.aux_var_log[n].ub + return val_node[2 * n + 1] - blk.aux_var_log[n] >= bigm * ( + 1 - op_bin_var[n] + ) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.log(right_child) + + +@declare_custom_block("SquareOperator", rule="build") +class SquareOperatorData(BaseOperatorData): + """Adds constraints for the square operator""" + + def build(self, *args, val_node: pyo.Var, op_bin_var: dict): + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + + @self.Constraint(srm.non_terminal_nodes_set) + def upper_bound_constraint(_, n): + bigm = vub + return val_node[n] - val_node[2 * n + 1] ** 2 <= bigm * (1 - op_bin_var[n]) + + @self.Constraint(srm.non_terminal_nodes_set) + def lower_bound_constraint(_, n): + bigm = vlb - max(pyo.value(vlb * vlb), pyo.value(vub * vub)) + return val_node[n] - val_node[2 * n + 1] ** 2 >= bigm * (1 - op_bin_var[n]) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return right_child**2 + + +# pylint: disable = undefined-variable +BIGM_OPERATORS = { + "sum": SumOperator, + "diff": DiffOperator, + "mult": MultOperator, + "div": DivOperator, + "sqrt": SqrtOperator, + "log": LogOperator, + "log_comp": LogCompOperator, + "log_old": LogOldOperator, + "exp": ExpOperator, + "exp_comp": ExpCompOperator, + "exp_old": ExpOldOperator, + "square": SquareOperator, +} + + +@declare_custom_block("BigmSampleBlock", rule="build") +class BigmSampleBlockData(BlockData): + """Class for evaluating the expression tree for each sample""" + + @property + def symbolic_regression_model(self): + """Returns a pointer to the symbolic regression model""" + return self.parent_block() + + # pylint: disable=attribute-defined-outside-init + def build(self, *args): + """Builds the expression tree for a given sample""" + srm = self.symbolic_regression_model + self.val_node = pyo.Var( + srm.nodes_set, + bounds=(srm.var_bounds["lb"], srm.var_bounds["ub"]), + doc="Value at each node", + ) + + # Value defining constraints for operands + data = srm.input_data_ref.loc[args[0]] + + @self.Constraint(srm.nodes_set) + def value_upper_bound_constraint(blk, n): + return blk.val_node[n] <= srm.constant_val[n] + sum( + data[op] * srm.select_operator[n, op] + for op in srm.operands_set + if op != "cst" + ) + srm.var_bounds["ub"] * sum( + srm.select_operator[n, op] for op in srm.operators_set + ) + + @self.Constraint(srm.nodes_set) + def value_lower_bound_constraint(blk, n): + return blk.val_node[n] >= srm.constant_val[n] + sum( + data[op] * srm.select_operator[n, op] + for op in srm.operands_set + if op != "cst" + ) + srm.var_bounds["lb"] * sum( + srm.select_operator[n, op] for op in srm.operators_set + ) + + for op in srm.operators_set: + # Get binary variables corresponding to the opeartor + # and save them in a dictionary. Here, the keys are node + # numbers and the values are Var objects. + op_bin_vars = {n: srm.select_operator[n, op] for n in srm.nodes_set} + setattr( + self, + op + "_operator", + BIGM_OPERATORS[op](val_node=self.val_node, op_bin_var=op_bin_vars), + ) + + self.residual = pyo.Var(doc="Residual value for the sample") + self.calculate_residual = pyo.Constraint( + expr=self.residual == srm.output_data_ref[self.index()] - self.val_node[1], + doc="Computes the residual between prediction and the data", + ) + self.square_of_residual = pyo.Expression(expr=self.residual**2) + + def add_symmetry_breaking_cuts(self): + """Adds symmetry breaking cuts to the sample""" + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + symmetric_operators = [ + op for op in ["sum", "mult"] if op in srm.binary_operators_set + ] + + @self.Constraint(srm.non_terminal_nodes_set) + def symmetry_breaking_constraints(blk, n): + return (blk.val_node[2 * n] - blk.val_node[2 * n + 1]) >= (vlb - vub) * ( + srm.select_node[n] + - sum(srm.select_operator[n, op] for op in symmetric_operators) + ) + + def compare_node_values(self): + """Compares the node values obtained from the model and manual calculation""" + srm = self.symbolic_regression_model + data = srm.input_data_ref.loc[self.index()] + true_value = { + n: srm.constant_val[n].value + + sum( + data[op] * srm.select_operator[n, op].value + for op in srm.operands_set + if op != "cst" + ) + for n in srm.nodes_set + } + for n in range(len(srm.non_terminal_nodes_set), 0, -1): + for op in srm.operators_set: + if srm.select_operator[n, op].value > 0.99: + # Operator is selector + true_value[n] += getattr(self, op + "_operator").compute_node_value( + true_value[2 * n], true_value[2 * n + 1] + ) + + computed_value = {n: self.val_node[n].value for n in self.val_node} + + return pd.DataFrame.from_dict( + {"True Value": true_value, "Computed Value": computed_value} + ) diff --git a/pySTAR/hull_operators.py b/pySTAR/hull_operators.py new file mode 100644 index 0000000..a05b52e --- /dev/null +++ b/pySTAR/hull_operators.py @@ -0,0 +1,409 @@ +""" +This module contains all the supported operator models +for symbolic regression. +""" + +import logging +import pandas as pd +from pyomo.core.base.block import BlockData, declare_custom_block +from pyomo.environ import Var, Constraint +import pyomo.environ as pyo + +LOGGER = logging.getLogger(__name__) + + +@declare_custom_block("BaseOperator", rule="build") +class BaseOperatorData(BlockData): + """Base class for defining operator models""" + + _is_unary_operator = False + _define_aux_var_right = False + + @property + def symbolic_regression_model(self): + """Returns a pointer to the symbolic regression model""" + return self._bin_var_ref.parent_block() + + @property + def operator_binary(self): + """Returns the binary variable associated with the operator""" + return self._bin_var_ref + + # pylint: disable = attribute-defined-outside-init, unused-argument + def build(self, *args, bin_var: Var): + """rule for constructing operator blocks""" + srm = bin_var.parent_block() # Symbolic regression model + lb = srm.var_bounds["lb"] + ub = srm.var_bounds["ub"] + eps = srm.eps_value + + # Declare auxiliary variables + self._bin_var_ref = bin_var # Store a reference to the binary var + self.val_node = Var(doc="Node value", bounds=(lb, ub)) + self.val_left_node = Var(doc="Left child value", bounds=(lb, ub)) + self.val_right_node = Var(doc="Right child value", bounds=(lb, ub)) + + if self._is_unary_operator: + # For unary operators, left node must not be chosen + self.val_left_node.fix(0) + + if self._define_aux_var_right: + # Declare an auxiliary variable for operators with singularity issue + self.aux_var_right = Var( + doc="Auxiliary variable for the right child", + bounds=(eps, max(1, ub)), # Ensure that the RHS is strictly positive + ) + + # Ensure that aux_var_right = (val_right_node if bin_var = 1 else 1) + self.calculate_aux_var_right = Constraint( + expr=self.aux_var_right == self.val_right_node + 1 - bin_var + ) + + # Add operator-specific constraints + try: + self.build_operator_model() + except NotImplementedError: + LOGGER.warning("Operator-specific model constraints are not implemented.") + + def build_operator_model(self): + """Adds operator-specific constraints""" + raise NotImplementedError("Operator-specific model is not implemented") + + # pylint: disable = no-member, attribute-defined-outside-init + def add_bound_constraints(self, val_left_node=True, val_right_node=True): + """Adds bound constraints on val_node variable""" + bin_var = self._bin_var_ref + + self.node_val_lb_con = Constraint( + expr=self.val_node.lb * bin_var <= self.val_node + ) + self.node_val_ub_con = Constraint( + expr=self.val_node <= self.val_node.ub * bin_var + ) + + if val_left_node: + self.left_node_val_lb_con = Constraint( + expr=self.val_left_node.lb * bin_var <= self.val_left_node + ) + self.left_node_val_ub_con = Constraint( + expr=self.val_left_node <= self.val_left_node.ub * bin_var + ) + + if val_right_node: + self.right_node_val_lb_con = Constraint( + expr=self.val_right_node.lb * bin_var <= self.val_right_node + ) + self.right_node_val_ub_con = Constraint( + expr=self.val_right_node <= self.val_right_node.ub * bin_var + ) + + def construct_convex_relaxation(self): + """Constructs a convex relaxation of the model""" + raise NotImplementedError("Convex relaxation is not supported") + + @staticmethod + def compute_node_value(left_child, right_child): + """Helper function to evaluate the value for a given operator""" + raise NotImplementedError("Helper function is not implemented") + + +# pylint: disable = attribute-defined-outside-init, missing-class-docstring +@declare_custom_block("SumOperator", rule="build") +class SumOperatorData(BaseOperatorData): + + def build_operator_model(self): + self.evaluate_val_node = Constraint( + expr=self.val_node == self.val_left_node + self.val_right_node + ) + self.add_bound_constraints() + + def construct_convex_relaxation(self): + # Operator model is convex by default, so return. + pass + + @staticmethod + def compute_node_value(left_child, right_child): + return left_child + right_child + + +@declare_custom_block("DiffOperator", rule="build") +class DiffOperatorData(BaseOperatorData): + + def build_operator_model(self): + self.evaluate_val_node = Constraint( + expr=self.val_node == self.val_left_node - self.val_right_node + ) + self.add_bound_constraints() + + def construct_convex_relaxation(self): + # Operator model is convex by default, so return. + pass + + @staticmethod + def compute_node_value(left_child, right_child): + return left_child - right_child + + +@declare_custom_block("MultOperator", rule="build") +class MultOperatorData(BaseOperatorData): + + def build_operator_model(self): + self.evaluate_val_node = Constraint( + expr=self.val_node == self.val_left_node * self.val_right_node + ) + self.add_bound_constraints() + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(left_child, right_child): + return left_child * right_child + + +@declare_custom_block("DivOperator", rule="build") +class DivOperatorData(BaseOperatorData): + _define_aux_var_right = True + + def build_operator_model(self): + self.evaluate_val_node = Constraint( + expr=self.val_node * self.aux_var_right == self.val_left_node + ) + self.add_bound_constraints() + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(left_child, right_child): + return left_child / right_child + + +# pylint: disable = no-member +@declare_custom_block("SquareOperator", rule="build") +class SquareOperatorData(BaseOperatorData): + _is_unary_operator = True + + def build_operator_model(self): + # Evaluate the value at the node + self.evaluate_val_node = Constraint( + expr=self.val_node == self.val_right_node * self.val_right_node + ) + + # val_node will be non-negative in this case, so update lb + self.val_node.setlb(0) + + # For unary operator, left node is fixed to zero. + self.add_bound_constraints(val_left_node=False) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return right_child**2 + + +@declare_custom_block("SqrtOperator", rule="build") +class SqrtOperatorData(BaseOperatorData): + _is_unary_operator = True + + def build_operator_model(self): + # Expressing the constraint in this manner makes it a + # quadratic constraint, instead of a general nonlinear constraint + self.evaluate_val_node = Constraint( + expr=self.val_node * self.val_node == self.val_right_node + ) + + # val_right_node must be non-negative in this case + self.val_right_node.setlb(0) + self.add_bound_constraints(val_left_node=False) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.sqrt(right_child) + + +@declare_custom_block("ExpOperator", rule="build") +class ExpOperatorData(BaseOperatorData): + _is_unary_operator = True + + def build_operator_model(self): + # Update the domain of variables + ub_val_node = self.val_node.ub + ub_val_right_node = self.val_right_node.ub + + self.val_right_node.setub(min(pyo.log(ub_val_node), ub_val_right_node)) + self.val_node.setlb(0) + + # To avoid numerical issues, we do not let the lower bound of the + # argument of the exp function go below -10 + self.val_right_node.setlb(max(-10, self.val_right_node.lb)) + self.add_bound_constraints(val_left_node=False) + + # If the operator is not chosen, then val_right_node = 0, + # exp(val_right_node) = 1. So, we add (bin_var - 1) to make the + # expression evaluate to zero. + self.evaluate_val_node = Constraint( + expr=self.val_node == pyo.exp(self.val_right_node) + self._bin_var_ref - 1 + ) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.exp(right_child) + + +@declare_custom_block("LogOperator", rule="build") +class LogOperatorData(BaseOperatorData): + _is_unary_operator = True + _define_aux_var_right = True + + def build_operator_model(self): + # If the operator is not selected, aux_var_right = 1, so log vanishes + self.evaluate_val_node = Constraint( + expr=self.val_node == pyo.log(self.aux_var_right) + ) + + self.add_bound_constraints(val_left_node=False) + + def construct_convex_relaxation(self): + raise NotImplementedError() + + @staticmethod + def compute_node_value(_, right_child): + return pyo.log(right_child) + + +OPERATOR_MODELS = { + "sum": SumOperator, + "diff": DiffOperator, + "mult": MultOperator, + "div": DivOperator, + "square": SquareOperator, + "sqrt": SqrtOperator, + "log": LogOperator, + "exp": ExpOperator, +} + + +@declare_custom_block("HullSampleBlock", rule="build") +class HullSampleBlockData(BlockData): + """Class for evaluating the expression tree for each sample""" + + @property + def symbolic_regression_model(self): + """Returns a pointer to the symbolic regression model""" + return self.parent_block() + + def build(self, s): + """rule for building the expression model for each sample""" + pb = self.parent_block() + input_data = pb.input_data_ref.loc[s] + output_value = pb.output_data_ref.loc[s] + + @self.Block(pb.nodes_set) + def node(blk, n): + # Declare a variable to track the value at this node + blk.val_node = Var( + doc="Value at the node", + bounds=(pb.var_bounds["lb"], pb.var_bounds["ub"]), + ) + + # Defining an expression for the val_node variable arising from operands + blk.value_from_operands = pyo.Expression( + expr=sum( + pb.select_operator[n, op] * input_data[op] + for op in input_data.index + ) + + pb.constant_val[n] + ) + + # Define a constraint to evaluate the value from all disjuncts + if n in pb.terminal_nodes_set: + # For terminal nodes, val_node = value from the operands chosen + blk.evaluate_val_node_var = Constraint( + expr=blk.val_node == blk.value_from_operands + ) + + for n in pb.non_terminal_nodes_set: + op_blocks = [] + for op in pb.operators_set: + # Build operator blocks for all unary and binary operators + setattr( + self.node[n], + op + "_operator", + OPERATOR_MODELS[op](bin_var=pb.select_operator[n, op]), + ) + op_blocks.append(getattr(self.node[n], op + "_operator")) + + self.node[n].evaluate_val_node_var = Constraint( + expr=self.node[n].val_node + == sum(blk.val_node for blk in op_blocks) + + self.node[n].value_from_operands + ) + + self.node[n].evaluate_val_left_node = Constraint( + expr=self.node[2 * n].val_node + == sum(blk.val_left_node for blk in op_blocks) + ) + self.node[n].evaluate_val_right_node = Constraint( + expr=self.node[2 * n + 1].val_node + == sum(blk.val_right_node for blk in op_blocks) + ) + + self.residual = pyo.Var(doc="Residual value for the sample") + self.calculate_residual = pyo.Constraint( + expr=self.residual == output_value - self.node[1].val_node, + doc="Computes the residual between prediction and the data", + ) + self.square_of_residual = pyo.Expression(expr=self.residual**2) + + def add_symmetry_breaking_cuts(self): + """Adds symmetry breaking cuts to the sample""" + srm = self.symbolic_regression_model + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + symmetric_operators = [ + op for op in ["sum", "mult"] if op in srm.binary_operators_set + ] + + @self.Constraint(srm.non_terminal_nodes_set) + def symmetry_breaking_constraints(blk, n): + return blk.node[2 * n].val_node - blk.node[2 * n + 1].val_node >= ( + vlb - vub + ) * ( + srm.select_node[n] + - sum(srm.select_operator[n, op] for op in symmetric_operators) + ) + + def compare_node_values(self): + """Compares the node values obtained from the model and manual calculation""" + srm = self.symbolic_regression_model + data = srm.input_data_ref.loc[self.index()] + true_value = { + n: srm.constant_val[n].value + + sum( + data[op] * srm.select_operator[n, op].value + for op in srm.operands_set + if op != "cst" + ) + for n in srm.nodes_set + } + for n in range(len(srm.non_terminal_nodes_set), 0, -1): + for op in srm.operators_set: + if srm.select_operator[n, op].value > 0.99: + # Operator is selector + true_value[n] += getattr( + self.node[n], op + "_operator" + ).compute_node_value(true_value[2 * n], true_value[2 * n + 1]) + + computed_value = {n: self.node[n].val_node.value for n in self.node} + + return pd.DataFrame.from_dict( + {"True Value": true_value, "Computed Value": computed_value} + ) diff --git a/pySTAR/symbolic_regression.py b/pySTAR/symbolic_regression.py new file mode 100644 index 0000000..1cc4e81 --- /dev/null +++ b/pySTAR/symbolic_regression.py @@ -0,0 +1,385 @@ +import logging +from typing import List, Optional +import pandas as pd +import pyomo.environ as pyo +from pyomo.environ import Var, Constraint + +# pylint: disable = import-error +from bigm_operators import BigmSampleBlock, BigmSampleBlockData +from hull_operators import HullSampleBlock + +LOGGER = logging.getLogger(__name__) +SUPPORTED_UNARY_OPS = ["square", "sqrt", "log", "exp"] +SUPPORTED_BINARY_OPS = ["sum", "diff", "mult", "div"] +SUPPORTED_UNARY_OPS += ["exp_old", "exp_comp", "log_old", "log_comp"] + + +# pylint: disable = logging-fstring-interpolation +class SymbolicRegressionModel(pyo.ConcreteModel): + """Builds the symbolic regression model for a given data""" + + def __init__( + self, + *args, + data: pd.DataFrame, + input_columns: List[str], + output_column: str, + tree_depth: int, + operators: Optional[list] = None, + var_bounds: tuple = (-100, 100), + constant_bounds: tuple = (-100, 100), + eps: float = 1e-4, + model_type: str = "bigm", + **kwds, + ): + super().__init__(*args, **kwds) + + self.model_type = model_type + # Check if the received operators are supported or not + if operators is None: + # If not specified, use all supported operators + unary_operators = SUPPORTED_UNARY_OPS + binary_operators = SUPPORTED_BINARY_OPS + + else: + # Ensure that there are no duplicates in the list of operators + if len(operators) != len(set(operators)): + raise ValueError("Duplicates are present in the list of operators") + + unary_operators = [] + binary_operators = [] + for op in operators: + if op in SUPPORTED_UNARY_OPS: + unary_operators.append(op) + elif op in SUPPORTED_BINARY_OPS: + binary_operators.append(op) + else: + raise ValueError(f"Operator {op} is not recognized!") + + LOGGER.info(f"Using {unary_operators} unary operators.") + LOGGER.info(f"Using {binary_operators} binary operators.") + + # Save a reference to the input and output data + _col_name_map = { + _name: f"x{index + 1}" for index, _name in enumerate(input_columns) + } + self.input_data_ref = data[input_columns].rename(columns=_col_name_map) + self.output_data_ref = data[output_column] + self.var_bounds = pyo.Param( + ["lb", "ub"], + initialize={"lb": var_bounds[0], "ub": var_bounds[1]}, + ) + self.constant_bounds = pyo.Param( + ["lb", "ub"], + initialize={"lb": constant_bounds[0], "ub": constant_bounds[1]}, + ) + self.eps_value = pyo.Param(initialize=eps, domain=pyo.PositiveReals) + self.min_tree_size = pyo.Param( + initialize=1, + mutable=True, + within=pyo.PositiveIntegers, + doc="Minimum number of active nodes in the tree", + ) + self.max_tree_size = pyo.Param( + initialize=2**tree_depth, + mutable=True, + within=pyo.PositiveIntegers, + doc="Maximum number of active nodes in the tree", + ) + + # Define model sets + # pylint: disable = no-member + self.binary_operators_set = pyo.Set(initialize=binary_operators) + self.unary_operators_set = pyo.Set(initialize=unary_operators) + self.operators_set = self.binary_operators_set.union(self.unary_operators_set) + self.operands_set = pyo.Set(initialize=list(_col_name_map.values()) + ["cst"]) + self.collective_operator_set = self.operators_set.union(self.operands_set) + + self.non_terminal_nodes_set = pyo.RangeSet(2 ** (tree_depth - 1) - 1) + self.terminal_nodes_set = pyo.RangeSet(2 ** (tree_depth - 1), 2**tree_depth - 1) + self.nodes_set = self.non_terminal_nodes_set.union(self.terminal_nodes_set) + + # Build the expression model + self._build_expression_tree_model() + + # Calculate the output value for each sample + if model_type == "bigm": + self.samples = BigmSampleBlock(self.input_data_ref.index.to_list()) + elif model_type == "hull": + self.samples = HullSampleBlock(self.input_data_ref.index.to_list()) + else: + raise ValueError(f"model_type: {model_type} is not recognized.") + + def _build_expression_tree_model(self): + """Builds constraints that return a valid expression tree""" + # Define variables + # pylint: disable = attribute-defined-outside-init, not-an-iterable + self.select_node = Var( + self.nodes_set, + domain=pyo.Binary, + doc="If 1, an operator/operator is assigned to the node", + ) + self.select_operator = Var( + self.nodes_set, + self.collective_operator_set, + domain=pyo.Binary, + doc="Binary variable associated with the operator at each node", + ) + self.constant_val = Var(self.nodes_set, doc="Constant values in the expression") + + # Binary and unary operators must not be chosen for terminal nodes + for n in self.terminal_nodes_set: + for op in self.operators_set: + self.select_operator[n, op].fix(0) + + # Constant must only be present in the left node (i.e., even-numbered nodes) + for n in self.nodes_set: + if n % 2 == 1: + self.select_operator[n, "cst"].fix(0) + + # Begin constructing essential constraints + # If a node is active, then either an operator or an operand + # must be chosen + @self.Constraint(self.nodes_set) + def active_node_active_operator(blk, n): + return blk.select_node[n] == sum( + blk.select_operator[n, op] for op in self.collective_operator_set + ) + + # If a binary or unary operator is chosen, then right child must be present + @self.Constraint(self.non_terminal_nodes_set) + def right_child_presence(blk, n): + return blk.select_node[2 * n + 1] == sum( + blk.select_operator[n, op] for op in self.operators_set + ) + + # If a binary operator is chosen, then left child must be present + @self.Constraint(self.non_terminal_nodes_set) + def left_child_presence(blk, n): + return blk.select_node[2 * n] == sum( + blk.select_operator[n, op] for op in self.binary_operators_set + ) + + # Set constant value to zero, if constant operator is not selected + @self.Constraint(self.nodes_set) + def constant_lb_con(blk, n): + return ( + blk.constant_bounds["lb"] * blk.select_operator[n, "cst"] + <= blk.constant_val[n] + ) + + @self.Constraint(self.nodes_set) + def constant_ub_con(blk, n): + return ( + blk.constant_val[n] + <= blk.constant_bounds["ub"] * blk.select_operator[n, "cst"] + ) + + # pylint: disable = attribute-defined-outside-init + def add_objective(self, objective_type: str = "sse"): + """Appends objective function to the model + + Parameters + ---------- + objective_type : str, optional + Choice of objective function. Supported objective functions are + Sum of squares of errors: "sse" + Bayesian Information Criterion: "bic", by default "sse" + """ + if objective_type == "sse": + # build SSR objective + self.sse = pyo.Objective(expr=sum(self.samples[:].square_of_residual)) + + elif objective_type == "bic": + # Build BIC objective + raise NotImplementedError("BIC is not supported currently") + + else: + raise ValueError( + f"Specified objective_type: {objective_type} is not supported." + ) + + def add_redundancy_cuts(self): + """Adds cuts to eliminate redundance from the model""" + + # Do not choose the same operand for both the child nodes. + # This cut avoids expressions of type: x + x, x - x, x * x, x / x. + @self.Constraint(self.non_terminal_nodes_set, self.operands_set) + def redundant_bin_op_same_operand(blk, n, op): + if op == "cst": + # Both child nodes cannot take a constant, so skip this case + return Constraint.Skip + + return ( + blk.select_operator[2 * n, op] + blk.select_operator[2 * n + 1, op] + <= blk.select_node[n] + ) + + # Remove Associative operator combinations + + # Remove composition of inverse functions exp(log(.)) square(sqrt(.)) + def _inverse_function_rule(blk, n, op_1, op_2): + if op_1 == op_2 or (2 * n + 1) in self.terminal_nodes_set: + # An example for op_1 = op_2: exp(exp(.)) or log(log(.)) + # Also, unary operator cannot be present in a terminal node + return Constraint.Skip + return ( + blk.select_operator[n, op_1] + blk.select_operator[2 * n + 1, op_2] + <= blk.select_node[n] + ) + + if "exp" in self.unary_operators_set and "log" in self.unary_operators_set: + self.redundant_inv_op_exp_log = Constraint( + self.non_terminal_nodes_set, + ["exp", "log"], + ["exp", "log"], + rule=_inverse_function_rule, + ) + + if "square" in self.unary_operators_set and "sqrt" in self.unary_operators_set: + self.redundant_inv_op_square_sqrt = Constraint( + self.non_terminal_nodes_set, + ["square", "sqrt"], + ["square", "sqrt"], + rule=_inverse_function_rule, + ) + + def add_symmetry_breaking_cuts(self, sample_name=None): + """Adds cuts to eliminate symmetrical trees from the model""" + if sample_name is None: + # If the sample is not specified, apply symmetry breaking + # cuts to the first sample + sample_name = self.input_data_ref.iloc[0].name + + self.samples[sample_name].add_symmetry_breaking_cuts() + + def add_implication_cuts(self): + """ + Adds cuts to avoid a certain combinations of operators based on the input data + """ + # Avoid division by zero + near_zero_operands, negative_operands = _get_operand_domains( + data=self.input_data_ref, tol=pyo.value(self.eps_value) + ) + + # pylint: disable = logging-fstring-interpolation + if len(near_zero_operands) > 0 and "div" in self.binary_operators_set: + LOGGER.info( + f"Data for operands {near_zero_operands} is close to zero. " + f"Adding cuts to prevent division with these operands." + ) + + @self.Constraint(self.non_terminal_nodes_set, near_zero_operands) + def implication_cuts_div_operator(blk, n, op): + return ( + blk.select_operator[n, "div"] + blk.select_operator[2 * n + 1, op] + <= blk.select_node[n] + ) + + if len(negative_operands) > 0 and "sqrt" in self.unary_operators_set: + LOGGER.info( + f"Data for operands {negative_operands} is negative. " + f"Adding cuts to prevent sqrt of these operands." + ) + + @self.Constraint(self.non_terminal_nodes_set, negative_operands) + def implication_cuts_sqrt_operator(blk, n, op): + return ( + blk.select_operator[n, "sqrt"] + blk.select_operator[2 * n + 1, op] + <= blk.select_node[n] + ) + + non_positive_operands = list(set(near_zero_operands + negative_operands)) + if len(non_positive_operands) > 0 and "log" in self.unary_operators_set: + LOGGER.info( + f"Data for operands {non_positive_operands} is non-positive. " + f"Adding cuts to prevent log of these operands." + ) + + @self.Constraint(self.non_terminal_nodes_set, non_positive_operands) + def implication_cuts_log_operator(blk, n, op): + return ( + blk.select_operator[n, "log"] + blk.select_operator[2 * n + 1, op] + <= blk.select_node[n] + ) + + def constrain_max_tree_size(self, size: int): + """Adds a constraint to constrain the maximum size of the tree""" + self.max_tree_size = size + + # If the constraint exists, then updating the parameter value is sufficient + # pylint: disable = attribute-defined-outside-init + if not hasattr(self, "max_tree_size_constraint"): + self.max_tree_size_constraint = Constraint( + expr=sum(self.select_node[n] for n in self.nodes_set) + <= self.max_tree_size + ) + + def constrain_min_tree_size(self, size: int): + """Adds a constraint to constrain the minimum size of the tree""" + self.min_tree_size = size + + # If the constraint exists, then updating the parameter value is sufficient + # pylint: disable = attribute-defined-outside-init + if not hasattr(self, "min_tree_size_constraint"): + self.min_tree_size_constraint = Constraint( + expr=sum(self.select_node[n] for n in self.nodes_set) + >= self.min_tree_size + ) + + def relax_integrality_constraints(self): + """Relaxes integrality requirement on all binary variables""" + self.select_node.domain = pyo.UnitInterval + self.select_operator.domain = pyo.UnitInterval + + def add_integrality_constraints(self): + """Adds integrality requirement on node and operator variables""" + self.select_node.domain = pyo.Binary + self.select_operator.domain = pyo.Binary + + def relax_nonconvex_constraints(self): + """Convexifies all non-convex nonlinear constraints""" + raise NotImplementedError("Model Convexification is not currently supported") + + def get_selected_operators(self): + """ + Returns the list of selected nodes along with the selected + operator at that node + """ + return [op for op, val in self.select_operator.items() if val.value > 0.95] + + def selected_tree_to_expression(self): + """Returns the optimal expression as a string""" + + def get_parity_plot_data(self): + """Returns a DataFrame containing actual outputs and predicted outputs""" + results = pd.DataFrame() + results["sim_data"] = self.output_data_ref + first_sample_name = self.input_data_ref.iloc[0].name + if isinstance(self.samples[first_sample_name], BigmSampleBlockData): + results["prediction"] = [ + self.samples[s].val_node[1].value for s in self.samples + ] + else: + results["prediction"] = [ + self.samples[s].node[1].val_node.value for s in self.samples + ] + results["square_of_error"] = [ + self.samples[s].square_of_residual.expr() for s in self.samples + ] + + return results + + +def _get_operand_domains(data: pd.DataFrame, tol: float): + near_zero_operands = [] + negative_operands = [] + + for op in data.columns: + if (abs(data[op]) < tol).any(): + near_zero_operands.append(op) + + if (data[op] < 0).any(): + negative_operands.append(op) + + return near_zero_operands, negative_operands diff --git a/pySTAR/symbolic_regression_example.py b/pySTAR/symbolic_regression_example.py new file mode 100644 index 0000000..a5b197f --- /dev/null +++ b/pySTAR/symbolic_regression_example.py @@ -0,0 +1,52 @@ +import numpy as np +import pandas as pd +import pyomo.environ as pyo +from symbolic_regression import SymbolicRegressionModel +from utils import get_gurobi + +np.random.seed(42) +data = pd.DataFrame( + np.random.uniform(low=1, high=3, size=(10, 2)), columns=["x1", "x2"] +) +data["y"] = data["x1"] ** 2 + data["x2"] +in_cols = ["x1", "x2"] +print(data) + + +def build_model(): + m = SymbolicRegressionModel( + data=data, + input_columns=in_cols, + output_column="y", + tree_depth=4, + operators=["sum", "diff", "mult", "div", "square", "sqrt"], + var_bounds=(-10, 10), + constant_bounds=(-100, 100), + model_type="bigm", + ) + m.add_objective() + + return m + + +if __name__ == "__main__": + solver = "baron" + mdl = build_model() + mdl.add_redundancy_cuts() + # mdl.add_tree_size_constraint(3) + + if solver == "scip": + solver = pyo.SolverFactory("scip") + solver.solve(mdl, tee=True) + + elif solver == "gurobi": + solver = get_gurobi(mdl) + solver.solve(tee=True) + + elif solver == "baron": + solver = pyo.SolverFactory("gams") + solver.solve(mdl, solver="baron", tee=True) + + mdl.constant_val.pprint() + print(mdl.get_parity_plot_data()) + print(mdl.get_selected_operators()) diff --git a/pySTAR/utils.py b/pySTAR/utils.py new file mode 100644 index 0000000..3e5a12c --- /dev/null +++ b/pySTAR/utils.py @@ -0,0 +1,126 @@ +from gurobipy import nlfunc +import pyomo.environ as pyo +import bigm_operators as bop +import hull_operators as hop +from symbolic_regression import SymbolicRegressionModel + + +def _bigm_gurobi_formulation(srm: SymbolicRegressionModel): + for blk in srm.component_data_objects(pyo.Block): + if isinstance(blk, (bop.ExpOperatorData, bop.LogOperatorData)): + # Deactivate the nonlinear constraints + blk.func_upper_bound_constraint.deactivate() + blk.func_lower_bound_constraint.deactivate() + + solver = pyo.SolverFactory("gurobi_persistent") + solver.set_instance(srm) + gm = solver._solver_model # Gurobipy model + pm_to_gm = solver._pyomo_var_to_solver_var_map + vlb, vub = srm.var_bounds["lb"], srm.var_bounds["ub"] + + for blk in srm.component_data_objects(pyo.Block): + if isinstance(blk, bop.LogOperatorData): + sb = blk.parent_block() # Sample block + val_node = sb.val_node + op_bin_var = {n: srm.select_operator[n, "log"] for n in srm.nodes_set} + + aux_vars = gm.addVars(list(srm.non_terminal_nodes_set)) + gm.addConstrs( + aux_vars[n] == nlfunc.log(pm_to_gm[blk.aux_var_log[n]]) + for n in srm.non_terminal_nodes_set + ) + gm.addConstrs( + pm_to_gm[val_node[n]] - aux_vars[n] + <= (vub - pyo.log(blk.aux_var_log[n].lb)) + * (1 - pm_to_gm[op_bin_var[n]]) + for n in srm.non_terminal_nodes_set + ) + gm.addConstrs( + pm_to_gm[val_node[n]] - aux_vars[n] + >= (vlb - pyo.log(blk.aux_var_log[n].ub)) + * (1 - pm_to_gm[op_bin_var[n]]) + for n in srm.non_terminal_nodes_set + ) + + if isinstance(blk, bop.ExpOperatorData): + sb = blk.parent_block() # Sample block + val_node = sb.val_node + op_bin_var = {n: srm.select_operator[n, "exp"] for n in srm.nodes_set} + + aux_vars = gm.addVars(list(srm.non_terminal_nodes_set)) + gm.addConstrs( + aux_vars[n] == nlfunc.exp(pm_to_gm[blk.aux_var_exp[n]]) + for n in srm.non_terminal_nodes_set + ) + gm.addConstrs( + pm_to_gm[val_node[n]] - aux_vars[n] + <= (vub - 0) * (1 - pm_to_gm[op_bin_var[n]]) + for n in srm.non_terminal_nodes_set + ) + gm.addConstrs( + pm_to_gm[val_node[n]] - aux_vars[n] + >= (vlb - vub) * (1 - pm_to_gm[op_bin_var[n]]) + for n in srm.non_terminal_nodes_set + ) + + for blk in srm.component_data_objects(pyo.Block): + if isinstance(blk, (bop.ExpOperatorData, bop.LogOperatorData)): + # Activate the nonlinear constraints + blk.func_upper_bound_constraint.activate() + blk.func_lower_bound_constraint.activate() + + return solver + + +def _hull_gurobi_formulation(m: SymbolicRegressionModel): + """Uses Gurobibpy interface to solve the MINLP""" + + for blk in m.component_data_objects(pyo.Block): + if isinstance(blk, (hop.ExpOperatorData, hop.LogOperatorData)): + # Deactivate the nonlinear constraint + blk.evaluate_val_node.deactivate() + + # pylint: disable = protected-access + grb = pyo.SolverFactory("gurobi_persistent") + grb.set_instance(m) + gm = grb._solver_model + pm_to_gm = grb._pyomo_var_to_solver_var_map + + for blk in m.component_data_objects(pyo.Block): + if isinstance(blk, hop.LogOperatorData): + # Add the nonlinear constraint + gm.addConstr( + pm_to_gm[blk.val_node] == nlfunc.log(pm_to_gm[blk.aux_var_right]) + ) + + elif isinstance(blk, hop.ExpOperatorData): + # Add the nonlinear constraint + gm.addConstr( + pm_to_gm[blk.val_node] + == nlfunc.exp(pm_to_gm[blk.val_right_node]) + + pm_to_gm[blk.operator_binary] + - 1 + ) + + # Activate the constraint back + for blk in m.component_data_objects(pyo.Block): + if isinstance(blk, (hop.LogOperatorData, hop.ExpOperatorData)): + # Activate the nonlinear constraint + blk.evaluate_val_node.activate() + + return grb + + +def get_gurobi(srm: SymbolicRegressionModel, options: dict | None = None): + """Returns Gurobi solver object""" + if options is None: + # Set default termination criteria + options = {"MIPGap": 0.01, "TimeLimit": 3600} + + if srm.model_type == "bigm": + solver = _bigm_gurobi_formulation(srm) + else: + solver = _hull_gurobi_formulation(srm) + + solver.options.update(options) + return solver