Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 173 additions & 14 deletions python/sdist/amici/import_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ObservableTransformation(str, enum.Enum):


def noise_distribution_to_observable_transformation(
noise_distribution: Union[str, Callable]
noise_distribution: Union[Dict[str, Union[str, list]], Callable]
) -> ObservableTransformation:
"""
Parse noise distribution string and extract observable transformation
Expand All @@ -63,17 +63,18 @@ def noise_distribution_to_observable_transformation(
:return:
observable transformation
"""
if isinstance(noise_distribution, str):
if noise_distribution.startswith('log-'):
if isinstance(noise_distribution, dict):
noise_distribution_type = noise_distribution['type']
if noise_distribution_type.startswith('log-'):
return ObservableTransformation.LOG
if noise_distribution.startswith('log10-'):
if noise_distribution_type.startswith('log10-'):
return ObservableTransformation.LOG10

return ObservableTransformation.LIN


def noise_distribution_to_cost_function(
noise_distribution: Union[str, Callable]
noise_distribution: Union[dict, Callable]
) -> Callable[[str], str]:
"""
Parse noise distribution string to a cost function definition amici can
Expand Down Expand Up @@ -104,6 +105,25 @@ def noise_distribution_to_cost_function(
\\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma m \\log(10)}\\
exp\\left(-\\frac{(\\log_{10} m - \\log_{10} y)^2}{2\\sigma^2}\\right)

- `'left-truncated-normal'`: A truncated normal distribution:

.. math::
\\pi(m|y,\\sigma) = \\frac{1}{1-\\Phi(\\frac{a-y}{\\sigma})}
\\frac{1}{\\sqrt{2\\pi}\\sigma}\\
exp\\left(-\\frac{(m-y)^2}{2\\sigma^2}\\right)

where \\Phi is the standard normal cumulative distribution function

.. math::
\\Phi(x) = \\frac{1}{2}(1+\\erf(\\frac{x}{\\sqrt{2}}))

- `'right-truncated-normal'`: A truncated normal distribution:

.. math::
\\pi(m|y,\\sigma) = \\frac{1}{\\Phi(\\frac{b-y}{\\sigma})}
\\frac{1}{\\sqrt{2\\pi}\\sigma}\\
exp\\left(-\\frac{(m-y)^2}{2\\sigma^2}\\right)

- `'laplace'`, `'lin-laplace'`: A laplace distribution:

.. math::
Expand Down Expand Up @@ -147,6 +167,82 @@ def noise_distribution_to_cost_function(
.. math::
r = \\frac{1-\\sigma}{\\sigma} y

- `'left-censored-normal'`: A left-censored normal distribution with
threshold v:

- if m <= v:

.. math::
\\pi(m|y,\\sigma) = \\Phi(\\frac{v-y}{\\sigma})

where \\Phi is the standard normal cumulative distribution function

.. math::
\\Phi(x) = \\frac{1}{2}(1+\\erf(\\frac{x}{\\sqrt{2}}))

- if m > v:

.. math::
\\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma}\\
exp\\left(-\\frac{(m-y)^2}{2\\sigma^2}\\right)

- `'right-censored-normal'`: A right-censored normal distribution with
threshold v:

- if m < v:

.. math::
\\pi(m|y,\\sigma) = \\frac{1}{\\sqrt{2\\pi}\\sigma}\\
exp\\left(-\\frac{(m-y)^2}{2\\sigma^2}\\right)

- if m >= v:

.. math::
\\pi(m|y,\\sigma) = 1-\\Phi(\\frac{v-y}{\\sigma})

where \\Phi is the standard normal cumulative distribution function

.. math::
\\Phi(x) = \\frac{1}{2}(1+\\erf(\\frac{x}{\\sqrt{2}}))

- `'left-censored-laplace'`: A left-censored laplace distribution with
threshold v:

- if m <= v:

.. math::
\\pi(m|y,\\sigma) = \\F(\\frac{v-y}{\\sigma})

where \\Phi is the standard laplace cumulative distribution function

.. math::
\\F(x) = \\frac{1}{2}(1+\\sign(x)(1-\\exp(-|x|)))

- if m > v:

.. math::
\\pi(m|y,\\sigma) = \\frac{1}{2\\sigma}
\\exp\\left(-\\frac{|m-y|}{\\sigma}\\right)

- `'right-censored-laplace'`: A right-censored laplace distribution with
threshold v:

- if m < v:

.. math::
\\pi(m|y,\\sigma) = \\frac{1}{2\\sigma}
\\exp\\left(-\\frac{|m-y|}{\\sigma}\\right)

- if m >= v:

.. math::
\\pi(m|y,\\sigma) = 1-\\Phi(\\frac{v-y}{\\sigma})

where \\Phi is the standard laplace cumulative distribution function

.. math::
\\Phi(x) = \\frac{1}{2}(1+\\sign(x)(1-\\exp(-|x|)))

The distributions above are for a single data point.
For a collection :math:`D=\\{m_i\\}_i` of data points and corresponding
simulations :math:`Y=\\{y_i\\}_i` and noise parameters
Expand Down Expand Up @@ -181,27 +277,52 @@ def noise_distribution_to_cost_function(
if isinstance(noise_distribution, Callable):
return noise_distribution

if noise_distribution in ['normal', 'lin-normal']:
noise_distribution_type = noise_distribution['type']

if noise_distribution_type in ['normal', 'lin-normal']:
y_string = '0.5*log(2*pi*{sigma}**2) + 0.5*(({y} - {m}) / {sigma})**2'
elif noise_distribution == 'log-normal':
elif noise_distribution_type == 'log-normal':
y_string = '0.5*log(2*pi*{sigma}**2*{m}**2) ' \
'+ 0.5*((log({y}) - log({m})) / {sigma})**2'
elif noise_distribution == 'log10-normal':
elif noise_distribution_type == 'log10-normal':
y_string = '0.5*log(2*pi*{sigma}**2*{m}**2*log(10)**2) ' \
'+ 0.5*((log({y}, 10) - log({m}, 10)) / {sigma})**2'
elif noise_distribution in ['laplace', 'lin-laplace']:
elif noise_distribution_type in ['left-truncated-normal',
'lin-left-truncated-normal']:
# left-truncated at a
a = noise_distribution['parameters'][0] # TODO
y_string = f'log(1-0.5*(1+erf(({a}-{{y}})/(sqrt(2)*{{sigma}}))))' \
' + 0.5*log(2*pi*{sigma}**2)' \
' + 0.5*(({y} - {m}) / {sigma})**2'
elif noise_distribution_type in ['right-truncated-normal',
'lin-right-truncated-normal']:
# right-truncated at b
b = noise_distribution['parameters'][0] # TODO
y_string = f'log(0.5*(1+erf(({b}-{{y}})/(sqrt(2)*{{sigma}}))))' \
' + 0.5*log(2*pi*{sigma}**2)' \
' + 0.5*(({y} - {m}) / {sigma})**2'
elif noise_distribution_type == 'log-left-truncated-normal':
# truncated at a
a = noise_distribution['parameters'][0] # TODO
# TODO a>0
y_string = f'log(1-0.5*(1+erf(log({a})-log({{y}})/' \
f'(sqrt(2)*{{sigma}}))))' \
' + 0.5*log(2*pi*{sigma}**2*{m}**2)' \
' + 0.5*((log({y}) - log({m})) / {sigma})**2'
elif noise_distribution_type in ['laplace', 'lin-laplace']:
y_string = 'log(2*{sigma}) + Abs({y} - {m}) / {sigma}'
elif noise_distribution == 'log-laplace':
elif noise_distribution_type == 'log-laplace':
y_string = 'log(2*{sigma}*{m}) + Abs(log({y}) - log({m})) / {sigma}'
elif noise_distribution == 'log10-laplace':
elif noise_distribution_type == 'log10-laplace':
y_string = 'log(2*{sigma}*{m}*log(10)) ' \
'+ Abs(log({y}, 10) - log({m}, 10)) / {sigma}'
elif noise_distribution in ['binomial', 'lin-binomial']:
elif noise_distribution_type in ['binomial', 'lin-binomial']:
# Binomial noise model parameterized via success probability p
y_string = '- log(Heaviside({y} - {m})) - loggamma({y}+1) ' \
'+ loggamma({m}+1) + loggamma({y}-{m}+1) ' \
'- {m} * log({sigma}) - ({y} - {m}) * log(1-{sigma})'
elif noise_distribution in ['negative-binomial', 'lin-negative-binomial']:
elif noise_distribution_type in ['negative-binomial',
'lin-negative-binomial']:
# Negative binomial noise model of the number of successes m
# (data) before r=(1-sigma)/sigma * y failures occur,
# with mean number of successes y (simulation),
Expand All @@ -210,9 +331,47 @@ def noise_distribution_to_cost_function(
y_string = f'- loggamma({{m}}+{r}) + loggamma({{m}}+1) ' \
f'+ loggamma({r}) - {r} * log(1-{{sigma}}) ' \
f'- {{m}} * log({{sigma}})'
elif noise_distribution_type in ['lin-left-censored-normal',
'left-censored-normal']:
# left-censored at v (detection limit)
v = noise_distribution['parameters'][0] # TODO
y_string = f'Piecewise((-log(0.5*(1+erf(({v}-{{y}})/' \
f'(sqrt(2)*{{sigma}})))), ' \
f'{{m}}<={v}), ' \
f'(0.5*log(2*pi*{{sigma}}**2) + ' \
f'0.5*(({{y}} - {{m}}) / {{sigma}})**2, ' \
f'{{m}}>{v}))'
elif noise_distribution_type in ['lin-right-censored-normal',
'right-censored-normal']:
# right-censored at v (detection limit)
v = noise_distribution['parameters'][0] # TODO
y_string = f'Piecewise((-log(1-0.5*(1+erf(({v}-{{y}})/' \
f'(sqrt(2)*{{sigma}})))), ' \
f'{{m}}>={v}), ' \
f'(0.5*log(2*pi*{{sigma}}**2) + ' \
f'0.5*(({{y}} - {{m}}) / {{sigma}})**2, ' \
f'{{m}}<{v}))'
elif noise_distribution_type in ['lin-left-censored-laplace',
'left-censored-laplace']:
# left-censored at v (detection limit)
v = noise_distribution['parameters'][0] # TODO
y_string = f'Piecewise((-log(0.5*(1+sign({v}-{{y}})*' \
f'(1-exp(-Abs({v}-{{y}})/{{sigma}}))), ' \
f'{{m}}<={v}), ' \
f'(log(2*{{sigma}}) + Abs({{y}} - {{m}}) / {{sigma}}, ' \
f'{{m}}>{v}))'
elif noise_distribution_type in ['lin-right-censored-laplace',
'right-censored-laplace']:
# right-censored at v (detection limit)
v = noise_distribution['parameters'][0] # TODO
y_string = f'Piecewise((-log(1-0.5*(1+sign({v}-{{y}})*' \
f'(1-exp(-Abs({v}-{{y}})/{{sigma}}))), ' \
f'{{m}}>={v}), ' \
f'(log(2*{{sigma}}) + Abs({{y}} - {{m}}) / {{sigma}}, ' \
f'{{m}}<{v}))'
else:
raise ValueError(
f"Cost identifier {noise_distribution} not recognized.")
f"Cost identifier {noise_distribution_type} not recognized.")

def nllh_y_string(str_symbol):
y, m, sigma = _get_str_symbol_identifiers(str_symbol)
Expand Down
13 changes: 10 additions & 3 deletions python/sdist/amici/petab_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ def import_model_sbml(

def get_observation_model(
observable_df: pd.DataFrame,
) -> Tuple[Dict[str, Dict[str, str]], Dict[str, str],
) -> Tuple[Dict[str, Dict[str, str]], Dict[str, Dict[str, Union[str, list]]],
Dict[str, Union[str, float]]]:
"""
Get observables, sigmas, and noise distributions from PEtab observation
Expand Down Expand Up @@ -763,7 +763,8 @@ def get_observation_model(


def petab_noise_distributions_to_amici(observable_df: pd.DataFrame
) -> Dict[str, str]:
) -> Dict[str,
Dict[str, Union[str, list]]]:
"""
Map from the petab to the amici format of noise distribution
identifiers.
Expand All @@ -789,7 +790,13 @@ def petab_noise_distributions_to_amici(observable_df: pd.DataFrame
amici_val += observable[NOISE_DISTRIBUTION]
else:
amici_val += 'normal'
amici_distrs[observable.name] = amici_val
amici_distrs[observable.name] = {'type': amici_val, 'parameters': []}

DISTRIBUTION_PARAMETERS = 'distributionParameters' # TODO
if DISTRIBUTION_PARAMETERS in observable \
and str(observable[DISTRIBUTION_PARAMETERS]):
amici_distrs[observable.name]['parameters'] = \
str(observable[DISTRIBUTION_PARAMETERS]).split(';')

return amici_distrs

Expand Down
28 changes: 17 additions & 11 deletions python/sdist/amici/sbml_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class SBMLException(Exception):
}

ConservationLaw = Dict[str, Union[str, sp.Expr]]
NoiseDistDict = Dict[str, Union[str, list]]

logger = get_logger(__name__, logging.ERROR)

Expand Down Expand Up @@ -215,7 +216,8 @@ def sbml2amici(
constant_parameters: Iterable[str] = None,
sigmas: Dict[str, Union[str, float]] = None,
event_sigmas: Dict[str, Union[str, float]] = None,
noise_distributions: Dict[str, Union[str, Callable]] = None,
noise_distributions: Optional[
Dict[str, Union[NoiseDistDict, Callable]]] = None,
event_noise_distributions: Dict[str, Union[str, Callable]] = None,
verbose: Union[int, bool] = logging.ERROR,
assume_pow_positivity: bool = False,
Expand Down Expand Up @@ -374,7 +376,8 @@ def _build_ode_model(
constant_parameters: Iterable[str] = None,
sigmas: Dict[str, Union[str, float]] = None,
event_sigmas: Dict[str, Union[str, float]] = None,
noise_distributions: Dict[str, Union[str, Callable]] = None,
noise_distributions: Optional[
Dict[str, Union[NoiseDistDict, Callable]]] = None,
event_noise_distributions: Dict[str, Union[str, Callable]] = None,
verbose: Union[int, bool] = logging.ERROR,
compute_conservation_laws: bool = True,
Expand Down Expand Up @@ -1216,23 +1219,24 @@ def _process_observables(
self,
observables: Union[Dict[str, Dict[str, str]], None],
sigmas: Dict[str, Union[str, float]],
noise_distributions: Dict[str, str]
noise_distributions: Dict[str, NoiseDistDict]
) -> None:
"""
Perform symbolic computations required for observable and objective
function evaluation.

:param observables:
dictionary(observableId: {'name':observableName
(optional), 'formula':formulaString)})
(optional), 'formula':formulaString})
to be added to the model

:param sigmas:
dictionary(observableId: sigma value or (existing)
parameter name)

:param noise_distributions:
dictionary(observableId: noise type)
dictionary(observableId: {'type': noise type, 'parameters':
list with parameter values})
See :py:func:`sbml2amici`.
"""

Expand All @@ -1254,7 +1258,8 @@ def _process_observables(
),
'transformation':
noise_distribution_to_observable_transformation(
noise_distributions.get(obs, 'normal')
noise_distributions.get(obs, {'type': 'normal',
'parameters': []})
)
}
for iobs, (obs, definition) in enumerate(observables.items())
Expand Down Expand Up @@ -1325,7 +1330,7 @@ def _process_event_observables(
'event': sp.Symbol(definition.get('event')),
'transformation':
noise_distribution_to_observable_transformation(
event_noise_distributions.get(obs, 'normal')
event_noise_distributions.get(obs, 'normal') # TODO
)
}
for iobs, (obs, definition) in
Expand Down Expand Up @@ -1383,7 +1388,7 @@ def _generate_default_observables(self):

def _process_log_likelihood(self,
sigmas: Dict[str, Union[str, float]],
noise_distributions: Dict[str, str],
noise_distributions: Dict[str, NoiseDistDict],
events: bool = False,
event_reg: bool = False):
"""
Expand Down Expand Up @@ -1444,7 +1449,8 @@ def _process_log_likelihood(self,
self.symbols[sigma_symbol].items()
):
symbol = symbol_with_assumptions(f'J{obs_id}')
dist = noise_distributions.get(str(obs_id), 'normal')
dist = noise_distributions.get(
str(obs_id), {'type': 'normal', 'parameters': []})
cost_fun = noise_distribution_to_cost_function(dist)(obs_id)
value = sp.sympify(cost_fun, locals=dict(zip(
_get_str_symbol_identifiers(obs_id),
Expand All @@ -1456,7 +1462,7 @@ def _process_log_likelihood(self,
self.symbols[llh_symbol][symbol] = {
'name': f'J{obs["name"]}',
'value': value,
'dist': dist,
'dist': dist['type'],
}

@log_execution_time('processing SBML initial assignments', logger)
Expand Down Expand Up @@ -2328,7 +2334,7 @@ def _parse_special_functions_sbml(sym: sp.Expr,
def _validate_observables(
observables: Union[Dict[str, Dict[str, str]], None],
sigmas: Dict[str, Union[str, float]],
noise_distributions: Dict[str, str],
noise_distributions: Dict[str, NoiseDistDict],
events: bool = False
) -> None:

Expand Down