Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 257 additions & 7 deletions gEconpy/model/statespace.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import warnings

from typing import Literal, get_args
from typing import Any, Literal, get_args

import arviz as az
import numpy as np
Expand All @@ -18,9 +18,12 @@
from pymc_extras.statespace.core.statespace import PyMCStateSpace
from pymc_extras.statespace.models.utilities import make_default_coords
from pymc_extras.statespace.utils.constants import (
EXOGENOUS_DIM,
JITTER_DEFAULT,
OBS_STATE_DIM,
SHOCK_AUX_DIM,
SHOCK_DIM,
TIME_DIM,
)
from pytensor import graph_replace

Expand All @@ -38,6 +41,73 @@
valid_solvers = get_args(SolverType)


def _validate_exog_args(
k_exog: int | dict[str, int] | None,
exog_state_names: list[str] | dict[str, list[str]] | None,
endog_names: list[str],
):
if k_exog is not None and not isinstance(k_exog, int | dict):
raise ValueError("If not None, k_endog must be either an int or a dict")
if exog_state_names is not None and not isinstance(exog_state_names, list | dict):
raise ValueError("If not None, exog_state_names must be either a list or a dict")

if k_exog is None and exog_state_names is None:
return None, None

if k_exog is not None and exog_state_names is not None:
if isinstance(k_exog, int) and isinstance(exog_state_names, list):
if len(exog_state_names) != k_exog:
raise ValueError("Length of exog_state_names does not match provided k_exog")
elif isinstance(k_exog, int) and isinstance(exog_state_names, dict):
raise ValueError("If k_exog is an int, exog_state_names must be a list of the same length (or None)")
elif isinstance(k_exog, dict) and isinstance(exog_state_names, list):
raise ValueError("If k_exog is a dict, exog_state_names must be a dict as well (or None)")
elif isinstance(k_exog, dict) and isinstance(exog_state_names, dict):
if set(k_exog.keys()) != set(exog_state_names.keys()):
raise ValueError("Keys of k_exog and exog_state_names dicts must match")
if not all(len(names) == k for names, k in zip(exog_state_names.values(), k_exog.values(), strict=False)):
raise ValueError(
"If both k_endog and exog_state_names are provided, lengths of exog_state_names "
"lists must match corresponding values in k_exog"
)
unknown_exog = set(exog_state_names.keys()) - set(endog_names)
if unknown_exog:
raise ValueError(
"Cannot specify exogenous variables for the following states which are not observed: "
f"{', '.join(unknown_exog)}"
)

if k_exog is not None and exog_state_names is None:
if isinstance(k_exog, int):
exog_state_names = [f"exogenous_{i}" for i in range(k_exog)]
elif isinstance(k_exog, dict):
unknown_exog = set(k_exog.keys()) - set(endog_names)
if unknown_exog:
raise ValueError(
"Cannot specify exogenous variables for the following states which are not observed: "
f"{', '.join(unknown_exog)}"
)
exog_state_names = {name: [f"{name}_exogenous_{i}" for i in range(k)] for name, k in k_exog.items()}

if k_exog is None and exog_state_names is not None:
if isinstance(exog_state_names, list):
k_exog = len(exog_state_names)
elif isinstance(exog_state_names, dict):
k_exog = {name: len(names) for name, names in exog_state_names.items()}

# If exog_state_names is a dict but 1) all endog variables are among the keys, and 2) all values are the same
# then we can drop back to the list case.
if (
isinstance(exog_state_names, dict)
and set(exog_state_names.keys()) == set(endog_names)
and len({frozenset(val) for val in exog_state_names.values()}) == 1
):
exog_state_names = exog_state_names[endog_names[0]]
k_exog = len(exog_state_names)

return k_exog, exog_state_names


class DSGEStateSpace(PyMCStateSpace):
"""Core class for estimating DSGE models using PyMC."""

Expand Down Expand Up @@ -235,6 +305,58 @@ def make_symbolic_graph(self):
sigma = self.make_and_register_variable(f"error_sigma_{state}", shape=())
self.ssm["obs_cov", i, i] = sigma**2

if self.exog_state_names is not None:
if isinstance(self.exog_state_names, list):
beta_exog = self.make_and_register_variable(
"beta_exog", shape=(self.k_endog, self.k_exog), dtype=floatX
)
exog_data = self.make_and_register_data("exogenous_data", shape=(None, self.k_exog), dtype=floatX)
self._name_to_variable[beta_exog.name] = beta_exog

obs_intercept = exog_data @ beta_exog.T

elif isinstance(self.exog_state_names, dict):
obs_components = []
for name in self.observed_states:
if name in self.exog_state_names:
k_exog = len(self.exog_state_names[name])
beta_exog = self.make_and_register_variable(f"beta_exog_{name}", shape=(k_exog,), dtype=floatX)
self._name_to_variable[beta_exog.name] = beta_exog

exog_data = self.make_and_register_data(
f"{name}_exogenous_data", shape=(None, k_exog), dtype=floatX
)
obs_components.append(pt.expand_dims(exog_data @ beta_exog, axis=-1))
else:
obs_components.append(pt.zeros((1, 1), dtype=floatX))

# TODO: Replace all of this with pt.concat_with_broadcast once PyMC works with pytensor >= 2.32

# If there were any zeros, they need to be broadcast against the non-zeros.
# Core shape is the last dim, the time dim is always broadcast
non_concat_shape = [1, None]

# Look for the first non-zero component to get the shape from
for tensor_inp in obs_components:
for i, (bcast, sh) in enumerate(zip(tensor_inp.type.broadcastable, tensor_inp.shape, strict=False)):
if bcast or i == 1:
continue
non_concat_shape[i] = sh

assert non_concat_shape.count(None) == 1

bcast_tensor_inputs = []
for tensor_inp in obs_components:
non_concat_shape[1] = tensor_inp.shape[1]
bcast_tensor_inputs.append(pt.broadcast_to(tensor_inp, non_concat_shape))

obs_intercept = pt.join(1, *bcast_tensor_inputs)

else:
raise NotImplementedError()

self.ssm["obs_intercept"] = obs_intercept

self.ssm["initial_state", :] = pt.zeros(self.k_states)

Q = self.ssm["state_cov"]
Expand All @@ -245,6 +367,8 @@ def configure(
observed_states: list[str],
measurement_error: list[str] | None = None,
constant_params: list[str] | None = None,
exog_state_names: list[str] | dict[str, list[str]] | None = None,
k_exog: int | dict[str, int] | None = None,
full_shock_covaraince: bool = False,
solver: SolverType = "gensys",
mode: str | None = None,
Expand All @@ -253,6 +377,50 @@ def configure(
tol: float = 1e-6,
use_adjoint_gradients: bool = True,
):
"""
Finalize the setup of a DSGE Statespace object.

Parameters
----------
observed_states: list of str
Names of the endogenous variables to be treated as observed timeseries. Must be a subset of the model's
endogenous variables.
measurement_error: list of str, optional
Names of the observed states to include measurement error for. Must be a subset of the observed_states.
If None, no measurement error will be included.
constant_params: list of str, optional
Names of the parameters to treat as constant (not estimated). Must be a subset of the model's parameters.
If None, all parameters will be estimated.
exog_state_names : list of str or dict, optional
Names of the exogenous state variables. If a list, all endogenous variables will share the same exogenous
variables. If a dict, keys should be the names of the endogenous variables, and values should be lists of
the exogenous variable names for that endogenous variable. Endogenous variables not included in the dict
will be assumed to have no exogenous variables. If None, no exogenous variables will be included.
k_exog : int or dict, optional
Number of exogenous variables. If an int, all endogenous variables will share the same number of exogenous
variables. If a dict, keys should be the names of the endogenous variables, and values should be the number
of exogenous variables for that endogenous variable. Endogenous variables not included in the dict will be
assumed to have no exogenous variables. If None, no exogenous variables will be included.
full_shock_covaraince: bool, default False
If True, estimate a full covariance matrix for the structural shocks. If False, estimate only the
diagonal elements (i.e. assume shocks are uncorrelated).
solver: str, default "gensys"
Which solver to use for solving the linearized system. Must be one of "gensys", "cycle_reduction",
or "scan_cycle_reduction".
mode: str, optional
Pytensor compilation mode to use for post-estimation methods. If None, the default mode will be used.
verbose: bool, default True
If True, print diagnostic messages during configuration.
max_iter: int, default 50
For iterative solvers ("cycle_reduction" and "scan_cycle_reduction"), the maximum number of iterations to
perform. Ignored if solver is "gensys".
tol: float, default 1e-6
Tolerance for convergence for iterative solvers ("cycle_reduction" and "scan_cycle_reduction"). Ignored if
solver is "gensys".
use_adjoint_gradients: bool, default True
If True, use adjoint gradients for the scan_cycle_reduction solver. Ignored if solver is not
"scan_cycle_reduction".
"""
# Set up observed states
unknown_states = [x for x in observed_states if x not in self.state_names]
if len(unknown_states) > 0:
Expand Down Expand Up @@ -314,10 +482,16 @@ def configure(
"use_adjoint_gradients": use_adjoint_gradients,
}

# Handle exogenous data requirements, if any
k_exog, exog_state_names = _validate_exog_args(k_exog, exog_state_names, endog_names=observed_states)

self._obs_state_names = observed_states
self.error_states = measurement_error
self.constant_parameters = constant_params

self.exog_state_names = exog_state_names
self.k_exog = k_exog

self.full_covariance = full_shock_covaraince
self._configured = True
self._solver = solver
Expand Down Expand Up @@ -347,6 +521,12 @@ def param_names(self):
if self.measurement_error:
param_names += [f"error_sigma_{state}" for state in self.error_states]

if self._configured and isinstance(self.exog_state_names, list):
param_names += ["beta_exog"]

elif self._configured and isinstance(self.exog_state_names, dict):
param_names += [f"beta_exog_{name}" for name in self.exog_state_names]

return param_names

@property
Expand All @@ -362,15 +542,30 @@ def observed_states(self):
return self._obs_state_names

@property
def param_dims(self):
def data_names(self) -> list[str]:
if not self._configured:
return {}

return {param: None if param != "state_cov" else (SHOCK_DIM, SHOCK_AUX_DIM) for param in self.param_names}
return []
if isinstance(self.exog_state_names, list):
return ["exogenous_data"]
if isinstance(self.exog_state_names, dict):
return [f"{endog_state}_exogenous_data" for endog_state in self.exog_state_names]
return []

@property
def coords(self):
return make_default_coords(self)
def param_dims(self):
var_to_dims = {
param: None if param != "state_cov" else (SHOCK_DIM, SHOCK_AUX_DIM) for param in self.param_names
}

if self._configured and isinstance(self.exog_state_names, list):
var_to_dims["beta_exog"] = (OBS_STATE_DIM, EXOGENOUS_DIM)
elif self._configured and isinstance(self.exog_state_names, dict):
# If each state has its own exogenous variables, each parameter needs it own dim, since we expect the
# dim labels to all be different (otherwise we'd be in the list case).
for name in self.exog_state_names:
var_to_dims[f"beta_{name}"] = (f"{EXOGENOUS_DIM}_{name}",)

return var_to_dims

@property
def param_info(self):
Expand All @@ -392,12 +587,67 @@ def param_info(self):
else:
info[var]["constraints"] = None

if self._configured and isinstance(self.exog_state_names, list):
k_exog = len(self.exog_state_names)
info["beta_exog"] = {
"shape": (self.k_endog, k_exog),
"constraints": "None",
}

elif self._configured and isinstance(self.exog_state_names, dict):
for name, exog_names in self.exog_state_names.items():
k_exog = len(exog_names)
info[f"beta_exog_{name}"] = {
"shape": (k_exog,),
"constraints": "None",
}

# Lazy way to add the dims without making any typos
for name in self.param_names:
info[name]["dims"] = self.param_dims[name]

return info

@property
def data_info(self) -> dict[str, dict[str, Any]]:
info = None

if not self._configured:
return info

if isinstance(self.exog_state_names, list):
info = {
"exogenous_data": {
"dims": (TIME_DIM, EXOGENOUS_DIM),
"shape": (None, self.k_exog),
}
}

elif isinstance(self.exog_state_names, dict):
info = {
f"{endog_state}_exogenous_data": {
"dims": (TIME_DIM, f"{EXOGENOUS_DIM}_{endog_state}"),
"shape": (None, len(exog_names)),
}
for endog_state, exog_names in self.exog_state_names.items()
}

return info

@property
def coords(self):
coords = make_default_coords(self)
if not self._configured:
return coords

if isinstance(self.exog_state_names, list):
coords[EXOGENOUS_DIM] = self.exog_state_names
elif isinstance(self.exog_state_names, dict):
for name, exog_names in self.exog_state_names.items():
coords[f"{EXOGENOUS_DIM}_{name}"] = exog_names

return coords

def build_statespace_graph(
self,
data: np.ndarray | pd.DataFrame | pt.TensorVariable,
Expand Down
Loading
Loading