diff --git a/src/cobra/flux_analysis/fastcc.py b/src/cobra/flux_analysis/fastcc.py index 1b34a1ac0..8f3d95b83 100644 --- a/src/cobra/flux_analysis/fastcc.py +++ b/src/cobra/flux_analysis/fastcc.py @@ -1,6 +1,7 @@ """Provide an implementation of FASTCC.""" -from typing import TYPE_CHECKING, List, Optional +from logging import getLogger +from typing import TYPE_CHECKING, List, Optional, Set from optlang.symbolics import Zero @@ -10,9 +11,44 @@ if TYPE_CHECKING: from cobra.core import Model, Reaction +logger = getLogger(__name__) +LARGE_VALUE = 1.0e6 + +def _add_lp7_vars( + model: "Model", rxns: List["Reaction"], flux_threshold: float +) -> None: + """Add the variables and constraints for the LP. + + Parameters + ---------- + model: cobra.Model + The model to operate on. + rxns: list of cobra.Reaction + The reactions to use for LP. + flux_threshold: float + The upper threshold an auxiliary variable can have. + + """ + prob = model.problem + vars_and_cons = [] + + for rxn in rxns: + var = prob.Variable( + "auxiliary_{}".format(rxn.id), lb=0.0, ub=flux_threshold + ) + const = prob.Constraint( + rxn.flux_expression - var, + name="aux_constraint_{}".format(rxn.id), + lb=-LARGE_VALUE, + ) + vars_and_cons.extend([var, const]) + + model.add_cons_vars(vars_and_cons) + model.solver.update() + def _find_sparse_mode( - model: "Model", rxns: List["Reaction"], flux_threshold: float, zero_cutoff: float + model: "Model", rxn_ids: Set[str], zero_cutoff: float ) -> List["Reaction"]: """Perform the LP required for FASTCC. @@ -22,8 +58,6 @@ def _find_sparse_mode( The model to perform FASTCC on. rxns: list of cobra.Reaction The reactions to use for LP. - flux_threshold: float - The upper threshold an auxiliary variable can have. zero_cutoff: float The cutoff below which flux is considered zero. @@ -33,36 +67,27 @@ def _find_sparse_mode( The list of reactions to consider as consistent. """ - if rxns: - obj_vars = [] - vars_and_cons = [] - prob = model.problem - - for rxn in rxns: - var = prob.Variable( - "auxiliary_{}".format(rxn.id), lb=0.0, ub=flux_threshold - ) - const = prob.Constraint( - rxn.forward_variable + rxn.reverse_variable - var, - name="constraint_{}".format(rxn.id), - lb=0.0, - ) - vars_and_cons.extend([var, const]) - obj_vars.append(var) + if not rxn_ids: + return set() + + # Enable constraints for the reactions + for rid in rxn_ids: + model.constraints.get("aux_constraint_{}".format(rid)).lb = 0.0 + + obj_vars = [model.variables.get("auxiliary_{}".format(rid)) for rid in rxn_ids] + model.objective = Zero + model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars}) - model.add_cons_vars(vars_and_cons) - model.objective = prob.Objective(Zero, sloppy=True) - model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars}) + sol = model.optimize(objective_sense="max") - model.optimize(objective_sense="max") - result = [rxn for rxn in model.reactions if abs(rxn.flux) > zero_cutoff] - else: - result = [] + # Disable constraints for the reactions + for rid in rxn_ids: + model.constraints.get("aux_constraint_{}".format(rid)).lb = -LARGE_VALUE - return result + return set(sol.fluxes[sol.fluxes.abs() > zero_cutoff].index) -def _flip_coefficients(model: "Model", rxns: List["Reaction"]) -> None: +def _flip_coefficients(model: "Model", rxn_ids: Set[str]) -> None: """Flip the coefficients for optimizing in reverse direction. Parameters @@ -73,17 +98,19 @@ def _flip_coefficients(model: "Model", rxns: List["Reaction"]) -> None: The list of reactions whose coefficients will be flipped. """ + if not rxn_ids: + return # flip reactions - for rxn in rxns: - const = model.constraints.get("constraint_{}".format(rxn.id)) - var = model.variables.get("auxiliary_{}".format(rxn.id)) + for rxn in rxn_ids: + const = model.constraints.get("aux_constraint_{}".format(rxn)) + var = model.variables.get("auxiliary_{}".format(rxn)) coefs = const.get_linear_coefficients(const.variables) const.set_linear_coefficients({k: -v for k, v in coefs.items() if k is not var}) + model.solver.update() - # flip objective - objective = model.objective - objective_coefs = objective.get_linear_coefficients(objective.variables) - objective.set_linear_coefficients({k: -v for k, v in objective_coefs.items()}) +def _any_set(s): + for x in s: + return set([x]) def fastcc( @@ -133,46 +160,60 @@ def fastcc( """ zero_cutoff = normalize_cutoff(model, zero_cutoff) - irreversible_rxns = [rxn for rxn in model.reactions if not rxn.reversibility] + all_rxns = {rxn.id for rxn in model.reactions} + irreversible_rxns = {rxn.id for rxn in model.reactions if not rxn.reversibility} rxns_to_check = irreversible_rxns + flipped = False + singletons = False with model: + _add_lp7_vars(model, model.reactions, flux_threshold) + rxns_to_keep = _find_sparse_mode( - model, rxns_to_check, flux_threshold, zero_cutoff + model, rxns_to_check, zero_cutoff + ) + rxns_to_check = all_rxns.difference(rxns_to_keep) + logger.info( + "Initial step found %d consistent reactions. " + "Starting the consistency loop for the remaining %d reactions.", + len(rxns_to_keep), len(rxns_to_check) ) - rxns_to_check = list(set(model.reactions).difference(rxns_to_keep)) - - while rxns_to_check: - with model: + while rxns_to_check: + logger.debug( + "reactions to check: %d - consistent reactions: %d - flipped: %d - singletons: %d", + len(rxns_to_check), len(rxns_to_keep), flipped, singletons + ) + check = _any_set(rxns_to_check) if singletons else rxns_to_check new_rxns = _find_sparse_mode( - model, rxns_to_check, flux_threshold, zero_cutoff + model, check, zero_cutoff ) - rxns_to_keep.extend(new_rxns) - - # this condition will be valid for all but the last iteration - if list(set(rxns_to_check).intersection(rxns_to_keep)): - rxns_to_check = list(set(rxns_to_check).difference(rxns_to_keep)) + rxns_to_keep.update(new_rxns) + if rxns_to_check.intersection(rxns_to_keep): + rxns_to_check = rxns_to_check.difference(rxns_to_keep) + flipped = False else: - rxns_to_flip = list(set(rxns_to_check).difference(irreversible_rxns)) - _flip_coefficients(model, rxns_to_flip) - sol = model.optimize(min) - to_add_rxns = sol.fluxes.index[sol.fluxes.abs() > zero_cutoff].tolist() - rxns_to_keep.extend( - [model.reactions.get_by_id(rxn) for rxn in to_add_rxns] - ) - # since this is the last iteration, it needs to break or else - # it will run forever since rxns_to_check won't be empty - break - - consistent_rxns = set(rxns_to_keep) - # need the ids since Reaction objects are created fresh with model.copy() - rxns_to_remove = [ - rxn.id for rxn in set(model.reactions).difference(consistent_rxns) - ] + check_irr = check.difference(irreversible_rxns) + if flipped or not check_irr: + if singletons: + logger.debug("%s is inconsistent", check) + rxns_to_check = rxns_to_check.difference(check) + flipped = False + singletons = True + else: + flipped = True + check = _any_set(rxns_to_check) if singletons else rxns_to_check + _flip_coefficients(model, check_irr) + logger.info( + "Final - consistent reactions: %d - inconsistent reactions: %d [eps=%.2g, tol=%.2g]", + len(rxns_to_keep), len(all_rxns) - len(rxns_to_keep), flux_threshold, zero_cutoff + ) consistent_model = model.copy() - consistent_model.remove_reactions(rxns_to_remove, remove_orphans=True) + consistent_model.remove_reactions( + all_rxns.difference(rxns_to_keep), + remove_orphans=True + ) return consistent_model