Skip to content

Allow multivariate components to share latent states #558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 5, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class AutoregressiveComponent(Component):
observed_state_names: list[str] | None, default None
List of strings for observed state labels. If None, defaults to ["data"].

share_states: bool, default False
Whether latent states are shared across the observed states. If True, there will be only one set of latent
states, which are observed by all observed states. If False, each observed state has its own set of
latent states. This argument has no effect if `k_endog` is 1.

Notes
-----
An autoregressive component can be thought of as a way o introducing serially correlated errors into the model.
Expand Down Expand Up @@ -73,45 +78,59 @@ def __init__(
order: int = 1,
name: str = "auto_regressive",
observed_state_names: list[str] | None = None,
share_states: bool = False,
):
if observed_state_names is None:
observed_state_names = ["data"]

k_posdef = k_endog = len(observed_state_names)
k_endog = len(observed_state_names)
k_endog_effective = k_posdef = 1 if share_states else k_endog

order = order_to_mask(order)
ar_lags = np.flatnonzero(order).ravel().astype(int) + 1
k_states = len(order)

self.share_states = share_states
self.order = order
self.ar_lags = ar_lags

super().__init__(
name=name,
k_endog=k_endog,
k_states=k_states * k_endog,
k_states=k_states * k_endog_effective,
k_posdef=k_posdef,
measurement_error=True,
combine_hidden_states=True,
observed_state_names=observed_state_names,
obs_state_idxs=np.tile(np.r_[[1.0], np.zeros(k_states - 1)], k_endog),
obs_state_idxs=np.tile(np.r_[[1.0], np.zeros(k_states - 1)], k_endog_effective),
share_states=share_states,
)

def populate_component_properties(self):
k_states = self.k_states // self.k_endog # this is also the number of AR lags
k_endog = self.k_endog
k_endog_effective = 1 if self.share_states else k_endog

self.state_names = [
f"L{i + 1}[{state_name}]"
for state_name in self.observed_state_names
for i in range(k_states)
]
k_states = self.k_states // k_endog_effective # this is also the number of AR lags
base_names = [f"L{i + 1}_{self.name}" for i in range(k_states)]

if self.share_states:
self.state_names = [f"{name}[shared]" for name in base_names]
self.shock_names = [f"{self.name}[shared]"]
else:
self.state_names = [
f"{name}[{state_name}]"
for state_name in self.observed_state_names
for name in base_names
]
self.shock_names = [
f"{self.name}[{obs_name}]" for obs_name in self.observed_state_names
]

self.shock_names = [f"{self.name}[{obs_name}]" for obs_name in self.observed_state_names]
self.param_names = [f"params_{self.name}", f"sigma_{self.name}"]
self.param_dims = {f"params_{self.name}": (f"lag_{self.name}",)}
self.coords = {f"lag_{self.name}": self.ar_lags.tolist()}

if self.k_endog > 1:
if k_endog_effective > 1:
self.param_dims[f"params_{self.name}"] = (
f"endog_{self.name}",
f"lag_{self.name}",
Expand Down Expand Up @@ -140,26 +159,29 @@ def populate_component_properties(self):

def make_symbolic_graph(self) -> None:
k_endog = self.k_endog
k_states = self.k_states // k_endog
k_endog_effective = 1 if self.share_states else k_endog

k_states = self.k_states // k_endog_effective
k_posdef = self.k_posdef

k_nonzero = int(sum(self.order))
ar_params = self.make_and_register_variable(
f"params_{self.name}", shape=(k_nonzero,) if k_endog == 1 else (k_endog, k_nonzero)
f"params_{self.name}",
shape=(k_nonzero,) if k_endog_effective == 1 else (k_endog_effective, k_nonzero),
)
sigma_ar = self.make_and_register_variable(
f"sigma_{self.name}", shape=() if k_endog == 1 else (k_endog,)
f"sigma_{self.name}", shape=() if k_endog_effective == 1 else (k_endog_effective,)
)

if k_endog == 1:
if k_endog_effective == 1:
T = pt.eye(k_states, k=-1)
ar_idx = (np.zeros(k_nonzero, dtype="int"), np.nonzero(self.order)[0])
T = T[ar_idx].set(ar_params)

else:
transition_matrices = []

for i in range(k_endog):
for i in range(k_endog_effective):
T = pt.eye(k_states, k=-1)
ar_idx = (np.zeros(k_nonzero, dtype="int"), np.nonzero(self.order)[0])
T = T[ar_idx].set(ar_params[i])
Expand All @@ -171,18 +193,21 @@ def make_symbolic_graph(self) -> None:
self.ssm["transition", :, :] = T

R = np.eye(k_states)
R_mask = np.full((k_states), False)
R_mask = np.full((k_states,), False)
R_mask[0] = True
R = R[:, R_mask]

self.ssm["selection", :, :] = pt.specify_shape(
pt.linalg.block_diag(*[R for _ in range(k_endog)]), (self.k_states, self.k_posdef)
pt.linalg.block_diag(*[R for _ in range(k_endog_effective)]), (self.k_states, k_posdef)
)

Z = pt.zeros((1, k_states))[0, 0].set(1.0)
self.ssm["design", :, :] = pt.specify_shape(
pt.linalg.block_diag(*[Z for _ in range(k_endog)]), (self.k_endog, self.k_states)
)
Zs = [pt.zeros((1, k_states))[0, 0].set(1.0) for _ in range(k_endog)]

if self.share_states:
Z = pt.join(0, *Zs)
else:
Z = pt.linalg.block_diag(*Zs)
self.ssm["design", :, :] = pt.specify_shape(Z, (k_endog, self.k_states))

cov_idx = ("state_cov", *np.diag_indices(k_posdef))
self.ssm[cov_idx] = sigma_ar**2
66 changes: 43 additions & 23 deletions pymc_extras/statespace/models/structural/components/cycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class CycleComponent(Component):
Names of the observed state variables. For univariate time series, defaults to ``["data"]``.
For multivariate time series, specify a list of names for each endogenous variable.

share_states: bool, default False
Whether latent states are shared across the observed states. If True, there will be only one set of latent
states, which are observed by all observed states. If False, each observed state has its own set of
latent states. This argument has no effect if `k_endog` is 1.

Notes
-----
The cycle component is very similar in implementation to the frequency domain seasonal component, expect that it
Expand Down Expand Up @@ -155,6 +160,7 @@ def __init__(
dampen: bool = False,
innovations: bool = True,
observed_state_names: list[str] | None = None,
share_states: bool = False,
):
if observed_state_names is None:
observed_state_names = ["data"]
Expand All @@ -167,6 +173,7 @@ def __init__(
cycle = int(cycle_length) if cycle_length is not None else "Estimate"
name = f"Cycle[s={cycle}, dampen={dampen}, innovations={innovations}]"

self.share_states = share_states
self.estimate_cycle_length = estimate_cycle_length
self.cycle_length = cycle_length
self.innovations = innovations
Expand All @@ -175,8 +182,8 @@ def __init__(

k_endog = len(observed_state_names)

k_states = 2 * k_endog
k_posdef = 2 * k_endog
k_states = 2 if share_states else 2 * k_endog
k_posdef = 2 if share_states else 2 * k_endog

obs_state_idx = np.zeros(k_states)
obs_state_idx[slice(0, k_states, 2)] = 1
Expand All @@ -190,21 +197,26 @@ def __init__(
combine_hidden_states=True,
obs_state_idxs=obs_state_idx,
observed_state_names=observed_state_names,
share_states=share_states,
)

def make_symbolic_graph(self) -> None:
k_endog = self.k_endog
k_endog_effective = 1 if self.share_states else k_endog

Z = np.array([1.0, 0.0]).reshape((1, -1))
design_matrix = block_diag(*[Z for _ in range(self.k_endog)])
design_matrix = block_diag(*[Z for _ in range(k_endog_effective)])
self.ssm["design", :, :] = pt.as_tensor_variable(design_matrix)

# selection matrix R defines structure of innovations (always identity for cycle components)
# when innovations=False, state cov Q=0, hence R @ Q @ R.T = 0
R = np.eye(2) # 2x2 identity for each cycle component
selection_matrix = block_diag(*[R for _ in range(self.k_endog)])
selection_matrix = block_diag(*[R for _ in range(k_endog_effective)])
self.ssm["selection", :, :] = pt.as_tensor_variable(selection_matrix)

init_state = self.make_and_register_variable(
f"params_{self.name}", shape=(self.k_endog, 2) if self.k_endog > 1 else (self.k_states,)
f"params_{self.name}",
shape=(k_endog_effective, 2) if k_endog_effective > 1 else (self.k_states,),
)
self.ssm["initial_state", :] = init_state.ravel()

Expand All @@ -219,37 +231,45 @@ def make_symbolic_graph(self) -> None:
rho = 1

T = rho * _frequency_transition_block(lamb, j=1)
transition = block_diag(*[T for _ in range(self.k_endog)])
transition = block_diag(*[T for _ in range(k_endog_effective)])
self.ssm["transition"] = pt.specify_shape(transition, (self.k_states, self.k_states))

if self.innovations:
if self.k_endog == 1:
if k_endog_effective == 1:
sigma_cycle = self.make_and_register_variable(f"sigma_{self.name}", shape=())
self.ssm["state_cov", :, :] = pt.eye(self.k_posdef) * sigma_cycle**2
else:
sigma_cycle = self.make_and_register_variable(
f"sigma_{self.name}", shape=(self.k_endog,)
f"sigma_{self.name}", shape=(k_endog_effective,)
)
state_cov = block_diag(
*[pt.eye(2) * sigma_cycle[i] ** 2 for i in range(self.k_endog)]
*[pt.eye(2) * sigma_cycle[i] ** 2 for i in range(k_endog_effective)]
)
self.ssm["state_cov"] = pt.specify_shape(state_cov, (self.k_states, self.k_states))
else:
# explicitly set state cov to 0 when no innovations
self.ssm["state_cov", :, :] = pt.zeros((self.k_posdef, self.k_posdef))

def populate_component_properties(self):
self.state_names = [
f"{f}_{self.name}[{var_name}]" if self.k_endog > 1 else f"{f}_{self.name}"
for var_name in self.observed_state_names
for f in ["Cos", "Sin"]
]
k_endog = self.k_endog
k_endog_effective = 1 if self.share_states else k_endog

base_names = [f"{f}_{self.name}" for f in ["Cos", "Sin"]]

if self.share_states:
self.state_names = [f"{name}[shared]" for name in base_names]
else:
self.state_names = [
f"{name}[{var_name}]" if k_endog_effective > 1 else name
for var_name in self.observed_state_names
for name in base_names
]

self.param_names = [f"params_{self.name}"]

if self.k_endog == 1:
if k_endog_effective == 1:
self.param_dims = {f"params_{self.name}": (f"state_{self.name}",)}
self.coords = {f"state_{self.name}": self.state_names}
self.coords = {f"state_{self.name}": base_names}
self.param_info = {
f"params_{self.name}": {
"shape": (2,),
Expand All @@ -265,7 +285,7 @@ def populate_component_properties(self):
}
self.param_info = {
f"params_{self.name}": {
"shape": (self.k_endog, 2),
"shape": (k_endog_effective, 2),
"constraints": None,
"dims": (f"endog_{self.name}", f"state_{self.name}"),
}
Expand All @@ -274,22 +294,22 @@ def populate_component_properties(self):
if self.estimate_cycle_length:
self.param_names += [f"length_{self.name}"]
self.param_info[f"length_{self.name}"] = {
"shape": () if self.k_endog == 1 else (self.k_endog,),
"shape": () if k_endog_effective == 1 else (k_endog_effective,),
"constraints": "Positive, non-zero",
"dims": None if self.k_endog == 1 else (f"endog_{self.name}",),
"dims": None if k_endog_effective == 1 else (f"endog_{self.name}",),
}

if self.dampen:
self.param_names += [f"dampening_factor_{self.name}"]
self.param_info[f"dampening_factor_{self.name}"] = {
"shape": () if self.k_endog == 1 else (self.k_endog,),
"shape": () if k_endog_effective == 1 else (k_endog_effective,),
"constraints": "0 < x ≤ 1",
"dims": None if self.k_endog == 1 else (f"endog_{self.name}",),
"dims": None if k_endog_effective == 1 else (f"endog_{self.name}",),
}

if self.innovations:
self.param_names += [f"sigma_{self.name}"]
if self.k_endog == 1:
if k_endog_effective == 1:
self.param_info[f"sigma_{self.name}"] = {
"shape": (),
"constraints": "Positive",
Expand All @@ -298,7 +318,7 @@ def populate_component_properties(self):
else:
self.param_dims[f"sigma_{self.name}"] = (f"endog_{self.name}",)
self.param_info[f"sigma_{self.name}"] = {
"shape": (self.k_endog,),
"shape": (k_endog_effective,),
"constraints": "Positive",
"dims": (f"endog_{self.name}",),
}
Expand Down
Loading
Loading