From f6564da4caa9bd413673a61cafe7ebf34e9699e9 Mon Sep 17 00:00:00 2001 From: jessegrabowski Date: Sun, 3 Aug 2025 23:57:36 +0800 Subject: [PATCH 01/10] Add shared_state argument to LevelTrendComponent --- .../structural/components/level_trend.py | 89 +++++++++---- .../structural/components/test_level_trend.py | 125 ++++++++++++++++++ 2 files changed, 185 insertions(+), 29 deletions(-) diff --git a/pymc_extras/statespace/models/structural/components/level_trend.py b/pymc_extras/statespace/models/structural/components/level_trend.py index ba44c706..95987ea9 100644 --- a/pymc_extras/statespace/models/structural/components/level_trend.py +++ b/pymc_extras/statespace/models/structural/components/level_trend.py @@ -13,13 +13,11 @@ class LevelTrendComponent(Component): Parameters ---------- order : int - Number of time derivatives of the trend to include in the model. For example, when order=3, the trend will be of the form ``y = a + b * t + c * t ** 2``, where the coefficients ``a, b, c`` come from the initial state values. innovations_order : int or sequence of int, optional - The number of stochastic innovations to include in the model. By default, ``innovations_order = order`` name : str, default "level_trend" @@ -28,6 +26,11 @@ class LevelTrendComponent(Component): observed_state_names : list[str] | None, default None List of strings for observed state labels. If None, defaults to ["data"]. + share_states: bool, default False + Whether latent states are shared across the observed states. If True, there will be only one set of latent + states, which are observed by all observed states. If False, each observed state has its own set of + latent states. This argument has no effect if `k_endog` is 1. + Notes ----- This class implements the level and trend components of the general structural time series model. In the most @@ -120,7 +123,10 @@ def __init__( innovations_order: int | list[int] | None = None, name: str = "level_trend", observed_state_names: list[str] | None = None, + share_states: bool = False, ): + self.share_states = share_states + if innovations_order is None: innovations_order = order @@ -156,37 +162,50 @@ def __init__( super().__init__( name, k_endog=k_endog, - k_states=k_states * k_endog, - k_posdef=k_posdef * k_endog, + k_states=k_states * k_endog if not share_states else k_states, + k_posdef=k_posdef * k_endog if not share_states else k_posdef, observed_state_names=observed_state_names, measurement_error=False, combine_hidden_states=False, - obs_state_idxs=np.tile(np.array([1.0] + [0.0] * (k_states - 1)), k_endog), + obs_state_idxs=np.tile( + np.array([1.0] + [0.0] * (k_states - 1)), k_endog if not share_states else 1 + ), ) def populate_component_properties(self): k_endog = self.k_endog - k_states = self.k_states // k_endog - k_posdef = self.k_posdef // k_endog + k_endog_effective = 1 if self.share_states else k_endog + + k_states = self.k_states // k_endog_effective + k_posdef = self.k_posdef // k_endog_effective name_slice = POSITION_DERIVATIVE_NAMES[:k_states] self.param_names = [f"initial_{self.name}"] base_names = [name for name, mask in zip(name_slice, self._order_mask) if mask] - self.state_names = [ - f"{name}[{obs_name}]" for obs_name in self.observed_state_names for name in base_names - ] + + if self.share_states: + self.state_names = [f"{name}[{self.name}_shared]" for name in base_names] + else: + self.state_names = [ + f"{name}[{obs_name}]" + for obs_name in self.observed_state_names + for name in base_names + ] + self.param_dims = {f"initial_{self.name}": (f"state_{self.name}",)} self.coords = {f"state_{self.name}": base_names} if k_endog > 1: + self.coords[f"endog_{self.name}"] = self.observed_state_names + + if k_endog_effective > 1: self.param_dims[f"state_{self.name}"] = ( f"endog_{self.name}", f"state_{self.name}", ) self.param_dims = {f"initial_{self.name}": (f"endog_{self.name}", f"state_{self.name}")} - self.coords[f"endog_{self.name}"] = self.observed_state_names - shape = (k_endog, k_states) if k_endog > 1 else (k_states,) + shape = (k_endog_effective, k_states) if k_endog_effective > 1 else (k_states,) self.param_info = {f"initial_{self.name}": {"shape": shape, "constraints": None}} if self.k_posdef > 0: @@ -196,20 +215,23 @@ def populate_component_properties(self): name for name, mask in zip(name_slice, self.innovations_order) if mask ] - self.shock_names = [ - f"{name}[{obs_name}]" - for obs_name in self.observed_state_names - for name in base_shock_names - ] + if self.share_states: + self.shock_names = [f"{name}[{self.name}_shared]" for name in base_shock_names] + else: + self.shock_names = [ + f"{name}[{obs_name}]" + for obs_name in self.observed_state_names + for name in base_shock_names + ] self.param_dims[f"sigma_{self.name}"] = ( (f"shock_{self.name}",) - if k_endog == 1 + if k_endog_effective == 1 else (f"endog_{self.name}", f"shock_{self.name}") ) self.coords[f"shock_{self.name}"] = base_shock_names self.param_info[f"sigma_{self.name}"] = { - "shape": (k_posdef,) if k_endog == 1 else (k_endog, k_posdef), + "shape": (k_posdef,) if k_endog_effective == 1 else (k_endog_effective, k_posdef), "constraints": "Positive", } @@ -218,12 +240,14 @@ def populate_component_properties(self): def make_symbolic_graph(self) -> None: k_endog = self.k_endog - k_states = self.k_states // k_endog - k_posdef = self.k_posdef // k_endog + k_endog_effective = 1 if self.share_states else k_endog + + k_states = self.k_states // k_endog_effective + k_posdef = self.k_posdef // k_endog_effective initial_trend = self.make_and_register_variable( f"initial_{self.name}", - shape=(k_states,) if k_endog == 1 else (k_endog, k_states), + shape=(k_states,) if k_endog_effective == 1 else (k_endog, k_states), ) self.ssm["initial_state", :] = initial_trend.ravel() @@ -231,27 +255,34 @@ def make_symbolic_graph(self) -> None: T = pt.zeros((k_states, k_states))[triu_idx[0], triu_idx[1]].set(1) self.ssm["transition", :, :] = pt.specify_shape( - pt.linalg.block_diag(*[T for _ in range(k_endog)]), (self.k_states, self.k_states) + pt.linalg.block_diag(*[T for _ in range(k_endog_effective)]), + (self.k_states, self.k_states), ) R = np.eye(k_states) R = R[:, self.innovations_order] self.ssm["selection", :, :] = pt.specify_shape( - pt.linalg.block_diag(*[R for _ in range(k_endog)]), (self.k_states, self.k_posdef) + pt.linalg.block_diag(*[R for _ in range(k_endog_effective)]), + (self.k_states, self.k_posdef), ) Z = np.array([1.0] + [0.0] * (k_states - 1)).reshape((1, -1)) - self.ssm["design", :, :] = pt.specify_shape( - pt.linalg.block_diag(*[Z for _ in range(k_endog)]), (self.k_endog, self.k_states) - ) + if self.share_states: + self.ssm["design", :, :] = pt.specify_shape( + pt.join(0, *[Z for _ in range(k_endog)]), (self.k_endog, self.k_states) + ) + else: + self.ssm["design", :, :] = pt.specify_shape( + pt.linalg.block_diag(*[Z for _ in range(k_endog)]), (self.k_endog, self.k_states) + ) if k_posdef > 0: sigma_trend = self.make_and_register_variable( f"sigma_{self.name}", - shape=(k_posdef,) if k_endog == 1 else (k_endog, k_posdef), + shape=(k_posdef,) if k_endog_effective == 1 else (k_endog, k_posdef), ) - diag_idx = np.diag_indices(k_posdef * k_endog) + diag_idx = np.diag_indices(k_posdef * k_endog_effective) idx = np.s_["state_cov", diag_idx[0], diag_idx[1]] self.ssm[idx] = (sigma_trend**2).ravel() diff --git a/tests/statespace/models/structural/components/test_level_trend.py b/tests/statespace/models/structural/components/test_level_trend.py index e3ca2e3e..0ccdb113 100644 --- a/tests/statespace/models/structural/components/test_level_trend.py +++ b/tests/statespace/models/structural/components/test_level_trend.py @@ -91,6 +91,44 @@ def test_level_trend_multiple_observed_construction(): ) +def test_level_trend_multiple_shared_construction(): + mod = st.LevelTrendComponent( + order=2, innovations_order=1, observed_state_names=["data_1", "data_2"], share_states=True + ) + mod = mod.build(verbose=False) + + assert mod.k_endog == 2 + assert mod.k_states == 2 + assert mod.k_posdef == 1 + + assert mod.coords["state_level_trend"] == ["level", "trend"] + assert mod.coords["endog_level_trend"] == ["data_1", "data_2"] + + assert mod.state_names == [ + "level[level_trend_shared]", + "trend[level_trend_shared]", + ] + assert mod.shock_names == ["level[level_trend_shared]"] + + Z, T, R = pytensor.function( + [], [mod.ssm["design"], mod.ssm["transition"], mod.ssm["selection"]], mode="FAST_COMPILE" + )() + + np.testing.assert_allclose( + Z, + np.array( + [ + [1.0, 0.0], + [1.0, 0.0], + ] + ), + ) + + np.testing.assert_allclose(T, np.array([[1.0, 1.0], [0.0, 1.0]])) + + np.testing.assert_allclose(R, np.array([[1.0], [0.0]])) + + def test_level_trend_multiple_observed(rng): mod = st.LevelTrendComponent( order=2, innovations_order=0, observed_state_names=["data_1", "data_2", "data_3"] @@ -102,6 +140,19 @@ def test_level_trend_multiple_observed(rng): assert (np.diff(x, axis=0) == np.array([[1.0, 0.0, 2.0, 0.0, 3.0, 0.0]])).all().all() +def test_level_trend_multiple_shared_observed(rng): + mod = st.LevelTrendComponent( + order=2, + innovations_order=0, + observed_state_names=["data_1", "data_2", "data_3"], + share_states=True, + ) + params = {"initial_level_trend": np.array([10.0, 0.1])} + x, y = simulate_from_numpy_model(mod, rng, params) + np.testing.assert_allclose(y[:, 0], y[:, 1]) + np.testing.assert_allclose(y[:, 0], y[:, 2]) + + def test_add_level_trend_with_different_observed(): mod_1 = st.LevelTrendComponent( name="ll", order=2, innovations_order=[0, 1], observed_state_names=["data_1"] @@ -156,3 +207,77 @@ def test_add_level_trend_with_different_observed(): ] ), ) + + +def test_mixed_shared_and_not_shared(): + mod_1 = st.LevelTrendComponent( + name="individual", + order=2, + innovations_order=[0, 1], + observed_state_names=["data_1", "data_2"], + ) + mod_2 = st.LevelTrendComponent( + name="joint", + order=2, + innovations_order=[1, 1], + observed_state_names=["data_1", "data_2"], + share_states=True, + ) + + mod = (mod_1 + mod_2).build(verbose=False) + + assert mod.k_endog == 2 + assert mod.k_states == 6 + assert mod.k_posdef == 4 + + assert mod.state_names == [ + "level[data_1]", + "trend[data_1]", + "level[data_2]", + "trend[data_2]", + "level[joint_shared]", + "trend[joint_shared]", + ] + + assert mod.shock_names == [ + "trend[data_1]", + "trend[data_2]", + "level[joint_shared]", + "trend[joint_shared]", + ] + + Z, T, R = pytensor.function( + [], [mod.ssm["design"], mod.ssm["transition"], mod.ssm["selection"]], mode="FAST_COMPILE" + )() + + np.testing.assert_allclose( + Z, np.array([[1.0, 0.0, 0.0, 0.0, 1.0, 0.0], [0.0, 0.0, 1.0, 0.0, 1.0, 0.0]]) + ) + + np.testing.assert_allclose( + T, + np.array( + [ + [1.0, 1.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 1.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 1.0, 1.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 1.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 1.0, 1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 1.0], + ] + ), + ) + + np.testing.assert_allclose( + R, + np.array( + [ + [0.0, 0.0, 0.0, 0.0], + [1.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0], + [0.0, 1.0, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.0], + [0.0, 0.0, 0.0, 1.0], + ] + ), + ) From 1f62c19d227682fc79d8af894ef266362b7855b2 Mon Sep 17 00:00:00 2001 From: jessegrabowski Date: Mon, 4 Aug 2025 10:05:42 +0800 Subject: [PATCH 02/10] Add shared_state argument to TimeSeasonality --- .../structural/components/seasonality.py | 75 ++++++++----- .../structural/components/test_seasonality.py | 102 ++++++++++++++++++ 2 files changed, 152 insertions(+), 25 deletions(-) diff --git a/pymc_extras/statespace/models/structural/components/seasonality.py b/pymc_extras/statespace/models/structural/components/seasonality.py index 05e7d41a..218ed9b8 100644 --- a/pymc_extras/statespace/models/structural/components/seasonality.py +++ b/pymc_extras/statespace/models/structural/components/seasonality.py @@ -44,6 +44,11 @@ class TimeSeasonality(Component): observed_state_names: list[str] | None, default None List of strings for observed state labels. If None, defaults to ["data"]. + share_states: bool, default False + Whether latent states are shared across the observed states. If True, there will be only one set of latent + states, which are observed by all observed states. If False, each observed state has its own set of + latent states. This argument has no effect if `k_endog` is 1. + Notes ----- A seasonal effect is any pattern that repeats at fixed intervals. There are several ways to model such effects; @@ -235,6 +240,7 @@ def __init__( state_names: list | None = None, remove_first_state: bool = True, observed_state_names: list[str] | None = None, + share_states: bool = False, ): if observed_state_names is None: observed_state_names = ["data"] @@ -261,6 +267,7 @@ def __init__( ) state_names = state_names.copy() + self.share_states = share_states self.innovations = innovations self.duration = duration self.remove_first_state = remove_first_state @@ -281,23 +288,33 @@ def __init__( super().__init__( name=name, k_endog=k_endog, - k_states=k_states * k_endog, - k_posdef=k_posdef * k_endog, + k_states=k_states if share_states else k_states * k_endog, + k_posdef=k_posdef if share_states else k_posdef * k_endog, observed_state_names=observed_state_names, measurement_error=False, combine_hidden_states=True, - obs_state_idxs=np.tile(np.array([1.0] + [0.0] * (k_states - 1)), k_endog), + obs_state_idxs=np.tile( + np.array([1.0] + [0.0] * (k_states - 1)), 1 if share_states else k_endog + ), ) def populate_component_properties(self): - k_states = self.k_states // self.k_endog k_endog = self.k_endog + k_endog_effective = 1 if self.share_states else k_endog + + k_states = self.k_states // k_endog_effective + + if self.share_states: + self.state_names = [ + f"{state_name}[{self.name}_shared]" for state_name in self.provided_state_names + ] + else: + self.state_names = [ + f"{state_name}[{endog_name}]" + for endog_name in self.observed_state_names + for state_name in self.provided_state_names + ] - self.state_names = [ - f"{state_name}[{endog_name}]" - for endog_name in self.observed_state_names - for state_name in self.provided_state_names - ] self.param_names = [f"params_{self.name}"] self.param_info = { @@ -305,20 +322,20 @@ def populate_component_properties(self): "shape": (k_states,) if k_endog == 1 else (k_endog, k_states), "constraints": None, "dims": (f"state_{self.name}",) - if k_endog == 1 + if k_endog_effective == 1 else (f"endog_{self.name}", f"state_{self.name}"), } } self.param_dims = { f"params_{self.name}": (f"state_{self.name}",) - if k_endog == 1 + if k_endog_effective == 1 else (f"endog_{self.name}", f"state_{self.name}") } self.coords = ( {f"state_{self.name}": self.provided_state_names} - if k_endog == 1 + if k_endog_effective == 1 else { f"endog_{self.name}": self.observed_state_names, f"state_{self.name}": self.provided_state_names, @@ -327,21 +344,27 @@ def populate_component_properties(self): if self.innovations: self.param_names += [f"sigma_{self.name}"] - self.shock_names = [f"{self.name}[{name}]" for name in self.observed_state_names] self.param_info[f"sigma_{self.name}"] = { - "shape": () if k_endog == 1 else (k_endog,), + "shape": () if k_endog_effective == 1 else (k_endog,), "constraints": "Positive", - "dims": None if k_endog == 1 else (f"endog_{self.name}",), + "dims": None if k_endog_effective == 1 else (f"endog_{self.name}",), } + if self.share_states: + self.shock_names = [f"{self.name}[shared]"] + else: + self.shock_names = [f"{self.name}[{name}]" for name in self.observed_state_names] + if k_endog > 1: self.param_dims[f"sigma_{self.name}"] = (f"endog_{self.name}",) def make_symbolic_graph(self) -> None: - k_states = self.k_states // self.k_endog + k_endog = self.k_endog + k_endog_effective = 1 if self.share_states else k_endog + k_states = self.k_states // k_endog_effective duration = self.duration + k_unique_states = k_states // duration - k_posdef = self.k_posdef // self.k_endog - k_endog = self.k_endog + k_posdef = self.k_posdef // k_endog_effective if self.remove_first_state: # In this case, parameters are normalized to sum to zero, so the current state is the negative sum of @@ -373,16 +396,18 @@ def make_symbolic_graph(self) -> None: T = pt.eye(k_states, k=1) T = pt.set_subtensor(T[-1, 0], 1) - self.ssm["transition", :, :] = pt.linalg.block_diag(*[T for _ in range(k_endog)]) + self.ssm["transition", :, :] = pt.linalg.block_diag(*[T for _ in range(k_endog_effective)]) Z = pt.zeros((1, k_states))[0, 0].set(1) - self.ssm["design", :, :] = pt.linalg.block_diag(*[Z for _ in range(k_endog)]) + self.ssm["design", :, :] = pt.linalg.block_diag(*[Z for _ in range(k_endog_effective)]) initial_states = self.make_and_register_variable( f"params_{self.name}", - shape=(k_unique_states,) if k_endog == 1 else (k_endog, k_unique_states), + shape=(k_unique_states,) + if k_endog_effective == 1 + else (k_endog_effective, k_unique_states), ) - if k_endog == 1: + if k_endog_effective == 1: self.ssm["initial_state", :] = pt.extra_ops.repeat(initial_states, duration, axis=0) else: self.ssm["initial_state", :] = pt.extra_ops.repeat( @@ -391,11 +416,11 @@ def make_symbolic_graph(self) -> None: if self.innovations: R = pt.zeros((k_states, k_posdef))[0, 0].set(1.0) - self.ssm["selection", :, :] = pt.join(0, *[R for _ in range(k_endog)]) + self.ssm["selection", :, :] = pt.join(0, *[R for _ in range(k_endog_effective)]) season_sigma = self.make_and_register_variable( - f"sigma_{self.name}", shape=() if k_endog == 1 else (k_endog,) + f"sigma_{self.name}", shape=() if k_endog_effective == 1 else (k_endog_effective,) ) - cov_idx = ("state_cov", *np.diag_indices(k_posdef * k_endog)) + cov_idx = ("state_cov", *np.diag_indices(k_posdef * k_endog_effective)) self.ssm[cov_idx] = season_sigma**2 diff --git a/tests/statespace/models/structural/components/test_seasonality.py b/tests/statespace/models/structural/components/test_seasonality.py index d439e0bb..2e87e5a8 100644 --- a/tests/statespace/models/structural/components/test_seasonality.py +++ b/tests/statespace/models/structural/components/test_seasonality.py @@ -147,6 +147,108 @@ def test_time_seasonality_multiple_observed(rng, d, remove_first_state): np.testing.assert_allclose(matrix, expected) +def test_time_seasonality_shared_states(): + mod = st.TimeSeasonality( + season_length=3, + duration=1, + innovations=True, + name="season", + state_names=["season_1", "season_2", "season_3"], + observed_state_names=["data_1", "data_2"], + remove_first_state=False, + share_states=True, + ) + + assert mod.k_endog == 2 + assert mod.k_states == 3 + assert mod.k_posdef == 1 + + assert mod.coords["state_season"] == ["season_1", "season_2", "season_3"] + + assert mod.state_names == [ + "season_1[season_shared]", + "season_2[season_shared]", + "season_3[season_shared]", + ] + assert mod.shock_names == ["season[shared]"] + + Z, T, R = pytensor.function( + [], [mod.ssm["design"], mod.ssm["transition"], mod.ssm["selection"]], mode="FAST_COMPILE" + )() + + np.testing.assert_allclose(np.array([[1.0, 0.0, 0.0], [1.0, 0.0, 0.0]]), Z) + + np.testing.assert_allclose(np.array([[0.0, 1.0, 0.0], [0.0, 0.0, 1.0], [1.0, 0.0, 0.0]]), T) + + np.testing.assert_allclose(np.array([[1.0], [0.0], [0.0]]), R) + + +def test_add_mixed_shared_not_shared_time_seasonality(): + shared_season = st.TimeSeasonality( + season_length=3, + duration=1, + innovations=True, + name="shared", + state_names=["season_1", "season_2", "season_3"], + observed_state_names=["data_1", "data_2"], + remove_first_state=False, + share_states=True, + ) + individual_season = st.TimeSeasonality( + season_length=3, + duration=1, + innovations=False, + name="individual", + state_names=["season_1", "season_2", "season_3"], + observed_state_names=["data_1", "data_2"], + remove_first_state=True, + share_states=False, + ) + mod = (shared_season + individual_season).build(verbose=False) + + assert mod.k_endog == 2 + assert mod.k_states == 7 + assert mod.k_posdef == 1 + + assert mod.coords["state_shared"] == ["season_1", "season_2", "season_3"] + assert mod.coords["state_individual"] == ["season_2", "season_3"] + + assert mod.state_names == [ + "season_1[shared_shared]", + "season_2[shared_shared]", + "season_3[shared_shared]", + "season_2[data_1]", + "season_3[data_1]", + "season_2[data_2]", + "season_3[data_2]", + ] + + Z, T, R = pytensor.function( + [], [mod.ssm["design"], mod.ssm["transition"], mod.ssm["selection"]], mode="FAST_COMPILE" + )() + + np.testing.assert_allclose( + np.array([[1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], [1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0]]), Z + ) + + np.testing.assert_allclose( + np.array( + [ + [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0], + [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, -1.0, -1.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0, -1.0, -1.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0], + ] + ), + T, + ) + + np.testing.assert_allclose(np.array([[1.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]]), R) + + @pytest.mark.parametrize("d1, d2", [(1, 1), (1, 3), (3, 1), (3, 3)]) def test_add_two_time_seasonality_different_observed(rng, d1, d2): mod1 = st.TimeSeasonality( From 3d79438c9f6195322d8e73e77103443c9877371f Mon Sep 17 00:00:00 2001 From: jessegrabowski Date: Tue, 5 Aug 2025 09:56:37 +0800 Subject: [PATCH 03/10] Add shared_state argument to CycleComponent --- .../models/structural/components/cycle.py | 65 +++++++---- .../structural/components/test_cycle.py | 110 ++++++++++++++++++ 2 files changed, 152 insertions(+), 23 deletions(-) diff --git a/pymc_extras/statespace/models/structural/components/cycle.py b/pymc_extras/statespace/models/structural/components/cycle.py index 7c10d152..367d746c 100644 --- a/pymc_extras/statespace/models/structural/components/cycle.py +++ b/pymc_extras/statespace/models/structural/components/cycle.py @@ -43,6 +43,11 @@ class CycleComponent(Component): Names of the observed state variables. For univariate time series, defaults to ``["data"]``. For multivariate time series, specify a list of names for each endogenous variable. + share_states: bool, default False + Whether latent states are shared across the observed states. If True, there will be only one set of latent + states, which are observed by all observed states. If False, each observed state has its own set of + latent states. This argument has no effect if `k_endog` is 1. + Notes ----- The cycle component is very similar in implementation to the frequency domain seasonal component, expect that it @@ -155,6 +160,7 @@ def __init__( dampen: bool = False, innovations: bool = True, observed_state_names: list[str] | None = None, + share_states: bool = False, ): if observed_state_names is None: observed_state_names = ["data"] @@ -167,6 +173,7 @@ def __init__( cycle = int(cycle_length) if cycle_length is not None else "Estimate" name = f"Cycle[s={cycle}, dampen={dampen}, innovations={innovations}]" + self.share_states = share_states self.estimate_cycle_length = estimate_cycle_length self.cycle_length = cycle_length self.innovations = innovations @@ -175,8 +182,8 @@ def __init__( k_endog = len(observed_state_names) - k_states = 2 * k_endog - k_posdef = 2 * k_endog + k_states = 2 if share_states else 2 * k_endog + k_posdef = 2 if share_states else 2 * k_endog obs_state_idx = np.zeros(k_states) obs_state_idx[slice(0, k_states, 2)] = 1 @@ -193,18 +200,22 @@ def __init__( ) def make_symbolic_graph(self) -> None: + k_endog = self.k_endog + k_endog_effective = 1 if self.share_states else k_endog + Z = np.array([1.0, 0.0]).reshape((1, -1)) - design_matrix = block_diag(*[Z for _ in range(self.k_endog)]) + design_matrix = block_diag(*[Z for _ in range(k_endog_effective)]) self.ssm["design", :, :] = pt.as_tensor_variable(design_matrix) # selection matrix R defines structure of innovations (always identity for cycle components) # when innovations=False, state cov Q=0, hence R @ Q @ R.T = 0 R = np.eye(2) # 2x2 identity for each cycle component - selection_matrix = block_diag(*[R for _ in range(self.k_endog)]) + selection_matrix = block_diag(*[R for _ in range(k_endog_effective)]) self.ssm["selection", :, :] = pt.as_tensor_variable(selection_matrix) init_state = self.make_and_register_variable( - f"{self.name}", shape=(self.k_endog, 2) if self.k_endog > 1 else (self.k_states,) + f"{self.name}", + shape=(k_endog_effective, 2) if k_endog_effective > 1 else (self.k_states,), ) self.ssm["initial_state", :] = init_state.ravel() @@ -219,19 +230,19 @@ def make_symbolic_graph(self) -> None: rho = 1 T = rho * _frequency_transition_block(lamb, j=1) - transition = block_diag(*[T for _ in range(self.k_endog)]) + transition = block_diag(*[T for _ in range(k_endog_effective)]) self.ssm["transition"] = pt.specify_shape(transition, (self.k_states, self.k_states)) if self.innovations: - if self.k_endog == 1: + if k_endog_effective == 1: sigma_cycle = self.make_and_register_variable(f"sigma_{self.name}", shape=()) self.ssm["state_cov", :, :] = pt.eye(self.k_posdef) * sigma_cycle**2 else: sigma_cycle = self.make_and_register_variable( - f"sigma_{self.name}", shape=(self.k_endog,) + f"sigma_{self.name}", shape=(k_endog_effective,) ) state_cov = block_diag( - *[pt.eye(2) * sigma_cycle[i] ** 2 for i in range(self.k_endog)] + *[pt.eye(2) * sigma_cycle[i] ** 2 for i in range(k_endog_effective)] ) self.ssm["state_cov"] = pt.specify_shape(state_cov, (self.k_states, self.k_states)) else: @@ -239,17 +250,25 @@ def make_symbolic_graph(self) -> None: self.ssm["state_cov", :, :] = pt.zeros((self.k_posdef, self.k_posdef)) def populate_component_properties(self): - self.state_names = [ - f"{f}_{self.name}[{var_name}]" if self.k_endog > 1 else f"{f}_{self.name}" - for var_name in self.observed_state_names - for f in ["Cos", "Sin"] - ] + k_endog = self.k_endog + k_endog_effective = 1 if self.share_states else k_endog + + base_names = [f"{f}_{self.name}" for f in ["Cos", "Sin"]] + + if self.share_states: + self.state_names = [f"{name}[shared]" for name in base_names] + else: + self.state_names = [ + f"{name}[{var_name}]" if k_endog_effective > 1 else name + for var_name in self.observed_state_names + for name in base_names + ] self.param_names = [f"{self.name}"] - if self.k_endog == 1: + if k_endog_effective == 1: self.param_dims = {self.name: (f"state_{self.name}",)} - self.coords = {f"state_{self.name}": self.state_names} + self.coords = {f"state_{self.name}": base_names} self.param_info = { f"{self.name}": { "shape": (2,), @@ -265,7 +284,7 @@ def populate_component_properties(self): } self.param_info = { f"{self.name}": { - "shape": (self.k_endog, 2), + "shape": (k_endog_effective, 2), "constraints": None, "dims": (f"endog_{self.name}", f"state_{self.name}"), } @@ -274,22 +293,22 @@ def populate_component_properties(self): if self.estimate_cycle_length: self.param_names += [f"length_{self.name}"] self.param_info[f"length_{self.name}"] = { - "shape": () if self.k_endog == 1 else (self.k_endog,), + "shape": () if k_endog_effective == 1 else (k_endog_effective,), "constraints": "Positive, non-zero", - "dims": None if self.k_endog == 1 else f"endog_{self.name}", + "dims": None if k_endog_effective == 1 else f"endog_{self.name}", } if self.dampen: self.param_names += [f"dampening_factor_{self.name}"] self.param_info[f"dampening_factor_{self.name}"] = { - "shape": () if self.k_endog == 1 else (self.k_endog,), + "shape": () if k_endog_effective == 1 else (k_endog_effective,), "constraints": "0 < x ≤ 1", - "dims": None if self.k_endog == 1 else (f"endog_{self.name}",), + "dims": None if k_endog_effective == 1 else (f"endog_{self.name}",), } if self.innovations: self.param_names += [f"sigma_{self.name}"] - if self.k_endog == 1: + if k_endog_effective == 1: self.param_info[f"sigma_{self.name}"] = { "shape": (), "constraints": "Positive", @@ -298,7 +317,7 @@ def populate_component_properties(self): else: self.param_dims[f"sigma_{self.name}"] = (f"endog_{self.name}",) self.param_info[f"sigma_{self.name}"] = { - "shape": (self.k_endog,), + "shape": (k_endog_effective,), "constraints": "Positive", "dims": (f"endog_{self.name}",), } diff --git a/tests/statespace/models/structural/components/test_cycle.py b/tests/statespace/models/structural/components/test_cycle.py index 0332bb9c..79fc364d 100644 --- a/tests/statespace/models/structural/components/test_cycle.py +++ b/tests/statespace/models/structural/components/test_cycle.py @@ -3,6 +3,8 @@ from numpy.testing import assert_allclose from pytensor import config +from pytensor.graph.basic import explicit_graph_inputs +from scipy import linalg from pymc_extras.statespace.models import structural as st from pymc_extras.statespace.models.structural.utils import _frequency_transition_block @@ -105,6 +107,27 @@ def test_cycle_multivariate_deterministic(rng): np.testing.assert_allclose(R, expected_R) +def test_multivariate_cycle_with_shared(rng): + cycle = st.CycleComponent( + name="cycle", + cycle_length=12, + estimate_cycle_length=False, + innovations=False, + observed_state_names=["data_1", "data_2", "data_3"], + share_states=True, + ) + + assert cycle.state_names == ["Cos_cycle[shared]", "Sin_cycle[shared]"] + assert cycle.shock_names == [] + assert cycle.param_names == ["cycle"] + + params = {"cycle": np.array([1.0, 2.0], dtype=config.floatX)} + x, y = simulate_from_numpy_model(cycle, rng, params, steps=12 * 12) + + np.testing.assert_allclose(y[:, 0], y[:, 1], atol=ATOL, rtol=RTOL) + np.testing.assert_allclose(y[:, 0], y[:, 2], atol=ATOL, rtol=RTOL) + + def test_cycle_multivariate_with_dampening(rng): """Test multivariate cycle component with dampening.""" cycle = st.CycleComponent( @@ -286,3 +309,90 @@ def test_add_multivariate_cycle_components_with_different_observed(): for i in range(4): expected_R[2 * i : 2 * i + 2, 2 * i : 2 * i + 2] = np.eye(2) assert_allclose(R, expected_R) + + +def test_add_multivariate_shared_and_not_shared(): + cycle_shared = st.CycleComponent( + name="shared_cycle", + cycle_length=12, + estimate_cycle_length=False, + innovations=True, + observed_state_names=["gdp", "inflation", "unemployment"], + share_states=True, + ) + cycle_individual = st.CycleComponent( + name="individual_cycle", + estimate_cycle_length=True, + innovations=False, + observed_state_names=["gdp", "inflation", "unemployment"], + dampen=True, + ) + mod = (cycle_shared + cycle_individual).build(verbose=False) + + assert mod.k_endog == 3 + assert mod.k_states == 2 + 3 * 2 + assert mod.k_posdef == 2 + 3 * 2 + + expected_states = [ + "Cos_shared_cycle[shared]", + "Sin_shared_cycle[shared]", + "Cos_individual_cycle[gdp]", + "Sin_individual_cycle[gdp]", + "Cos_individual_cycle[inflation]", + "Sin_individual_cycle[inflation]", + "Cos_individual_cycle[unemployment]", + "Sin_individual_cycle[unemployment]", + ] + + assert mod.state_names == expected_states + assert mod.shock_names == expected_states[:2] + + assert mod.param_names == [ + "shared_cycle", + "sigma_shared_cycle", + "individual_cycle", + "length_individual_cycle", + "dampening_factor_individual_cycle", + "P0", + ] + + assert "endog_shared_cycle" not in mod.coords + assert mod.coords["state_shared_cycle"] == ["Cos_shared_cycle", "Sin_shared_cycle"] + assert mod.coords["state_individual_cycle"] == ["Cos_individual_cycle", "Sin_individual_cycle"] + assert mod.coords["endog_individual_cycle"] == ["gdp", "inflation", "unemployment"] + + assert mod.param_info["shared_cycle"]["dims"] == ("state_shared_cycle",) + assert mod.param_info["shared_cycle"]["shape"] == (2,) + + assert mod.param_info["sigma_shared_cycle"]["dims"] is None + assert mod.param_info["sigma_shared_cycle"]["shape"] == () + + assert mod.param_info["individual_cycle"]["dims"] == ( + "endog_individual_cycle", + "state_individual_cycle", + ) + assert mod.param_info["individual_cycle"]["shape"] == (3, 2) + + params = { + "length_individual_cycle": 12.0, + "dampening_factor_individual_cycle": 0.95, + } + outputs = [mod.ssm["transition"], mod.ssm["design"], mod.ssm["selection"]] + T, Z, R = pytensor.function( + list(explicit_graph_inputs(outputs)), + outputs, + mode="FAST_COMPILE", + )(**params) + + lamb = 2 * np.pi / 12 # dampening factor for individual cycle + transition_block = np.array( + [[np.cos(lamb), np.sin(lamb)], [-np.sin(lamb), np.cos(lamb)]], dtype=config.floatX + ) + T_expected = linalg.block_diag(transition_block, *[0.95 * transition_block] * 3) + np.testing.assert_allclose(T, T_expected) + + np.testing.assert_allclose( + Z, np.array([[1, 0, 1, 0, 0, 0, 0, 0], [1, 0, 0, 0, 1, 0, 0, 0], [1, 0, 0, 0, 0, 0, 1, 0]]) + ) + + np.testing.assert_allclose(R, np.eye(8, dtype=config.floatX)) From 126d52c45cd904825be217b84f4f6e72a56cb67f Mon Sep 17 00:00:00 2001 From: Jesse Grabowski Date: Mon, 4 Aug 2025 21:52:41 +0800 Subject: [PATCH 04/10] Add shared_state argument to FrequencySeasonality --- .../structural/components/seasonality.py | 79 +++++++++++-------- .../structural/components/test_seasonality.py | 69 ++++++++++++++++ 2 files changed, 114 insertions(+), 34 deletions(-) diff --git a/pymc_extras/statespace/models/structural/components/seasonality.py b/pymc_extras/statespace/models/structural/components/seasonality.py index 218ed9b8..f068f744 100644 --- a/pymc_extras/statespace/models/structural/components/seasonality.py +++ b/pymc_extras/statespace/models/structural/components/seasonality.py @@ -449,6 +449,11 @@ class FrequencySeasonality(Component): observed_state_names: list[str] | None, default None List of strings for observed state labels. If None, defaults to ["data"]. + share_states: bool, default False + Whether latent states are shared across the observed states. If True, there will be only one set of latent + states, which are observed by all observed states. If False, each observed state has its own set of + latent states. This argument has no effect if `k_endog` is 1. + Notes ----- A seasonal effect is any pattern that repeats every fixed interval. Although there are many possible ways to @@ -480,15 +485,17 @@ class FrequencySeasonality(Component): def __init__( self, - season_length, - n=None, - name=None, - innovations=True, + season_length: int, + n: int | None = None, + name: str | None = None, + innovations: bool = True, observed_state_names: list[str] | None = None, + share_states: bool = False, ): if observed_state_names is None: observed_state_names = ["data"] + self.share_states = share_states k_endog = len(observed_state_names) if n is None: @@ -504,18 +511,20 @@ def __init__( # If the model is completely saturated (n = s // 2), the last state will not be identified, so it shouldn't # get a parameter assigned to it and should just be fixed to zero. # Test this way (rather than n == s // 2) to catch cases when n is non-integer. - self.last_state_not_identified = self.season_length / self.n == 2.0 + self.last_state_not_identified = (self.season_length / self.n) == 2.0 self.n_coefs = k_states - int(self.last_state_not_identified) obs_state_idx = np.zeros(k_states) obs_state_idx[slice(0, k_states, 2)] = 1 - obs_state_idx = np.tile(obs_state_idx, k_endog) + obs_state_idx = np.tile(obs_state_idx, 1 if share_states else k_endog) super().__init__( name=name, k_endog=k_endog, - k_states=k_states * k_endog, - k_posdef=k_states * int(self.innovations) * k_endog, + k_states=k_states if share_states else k_states * k_endog, + k_posdef=k_states * int(self.innovations) + if share_states + else k_states * int(self.innovations) * k_endog, observed_state_names=observed_state_names, measurement_error=False, combine_hidden_states=True, @@ -524,13 +533,15 @@ def __init__( def make_symbolic_graph(self) -> None: k_endog = self.k_endog - k_states = self.k_states // k_endog - k_posdef = self.k_posdef // k_endog + k_endog_effective = 1 if self.share_states else k_endog + + k_states = self.k_states // k_endog_effective + k_posdef = self.k_posdef // k_endog_effective n_coefs = self.n_coefs Z = pt.zeros((1, k_states))[0, slice(0, k_states, 2)].set(1.0) - self.ssm["design", :, :] = pt.linalg.block_diag(*[Z for _ in range(k_endog)]) + self.ssm["design", :, :] = pt.linalg.block_diag(*[Z for _ in range(k_endog_effective)]) init_state = self.make_and_register_variable( f"params_{self.name}", shape=(n_coefs,) if k_endog == 1 else (k_endog, n_coefs) @@ -539,7 +550,7 @@ def make_symbolic_graph(self) -> None: init_state_idx = np.concatenate( [ np.arange(k_states * i, (i + 1) * k_states, dtype=int)[:n_coefs] - for i in range(k_endog) + for i in range(k_endog_effective) ], axis=0, ) @@ -548,11 +559,11 @@ def make_symbolic_graph(self) -> None: T_mats = [_frequency_transition_block(self.season_length, j + 1) for j in range(self.n)] T = pt.linalg.block_diag(*T_mats) - self.ssm["transition", :, :] = pt.linalg.block_diag(*[T for _ in range(k_endog)]) + self.ssm["transition", :, :] = pt.linalg.block_diag(*[T for _ in range(k_endog_effective)]) if self.innovations: sigma_season = self.make_and_register_variable( - f"sigma_{self.name}", shape=() if k_endog == 1 else (k_endog,) + f"sigma_{self.name}", shape=() if k_endog_effective == 1 else (k_endog_effective,) ) self.ssm["selection", :, :] = pt.eye(self.k_states) self.ssm["state_cov", :, :] = pt.eye(self.k_posdef) * pt.repeat( @@ -561,35 +572,35 @@ def make_symbolic_graph(self) -> None: def populate_component_properties(self): k_endog = self.k_endog + k_endog_effective = 1 if self.share_states else k_endog n_coefs = self.n_coefs - self.state_names = [ - f"{f}_{i}_{self.name}[{obs_state_name}]" - for obs_state_name in self.observed_state_names - for i in range(self.n) - for f in ["Cos", "Sin"] - ] - # determine which state names correspond to parameters - # all endog variables use same state structure, so we just need - # the first n_coefs state names (which may be less than total if saturated) - param_state_names = [f"{f}_{i}_{self.name}" for i in range(self.n) for f in ["Cos", "Sin"]][ - :n_coefs - ] + base_names = [f"{f}_{i}_{self.name}" for i in range(self.n) for f in ["Cos", "Sin"]] - self.param_names = [f"params_{self.name}"] + if self.share_states: + self.state_names = [f"{name}[shared]" for name in base_names] + else: + self.state_names = [ + f"{name}[{obs_state_name}]" + for obs_state_name in self.observed_state_names + for name in base_names + ] + # Trim state names if the model is saturated + param_state_names = base_names[:n_coefs] + + self.param_names = [f"params_{self.name}"] self.param_dims = { f"params_{self.name}": (f"state_{self.name}",) - if k_endog == 1 + if k_endog_effective == 1 else (f"endog_{self.name}", f"state_{self.name}") } - self.param_info = { f"params_{self.name}": { - "shape": (n_coefs,) if k_endog == 1 else (k_endog, n_coefs), + "shape": (n_coefs,) if k_endog_effective == 1 else (k_endog_effective, n_coefs), "constraints": None, "dims": (f"state_{self.name}",) - if k_endog == 1 + if k_endog_effective == 1 else (f"endog_{self.name}", f"state_{self.name}"), } } @@ -607,9 +618,9 @@ def populate_component_properties(self): self.param_names += [f"sigma_{self.name}"] self.shock_names = self.state_names.copy() self.param_info[f"sigma_{self.name}"] = { - "shape": () if k_endog == 1 else (k_endog,), + "shape": () if k_endog_effective == 1 else (k_endog_effective, n_coefs), "constraints": "Positive", - "dims": None if k_endog == 1 else (f"endog_{self.name}",), + "dims": None if k_endog_effective == 1 else (f"endog_{self.name}",), } - if k_endog > 1: + if k_endog_effective > 1: self.param_dims[f"sigma_{self.name}"] = (f"endog_{self.name}",) diff --git a/tests/statespace/models/structural/components/test_seasonality.py b/tests/statespace/models/structural/components/test_seasonality.py index 2e87e5a8..353ccbe2 100644 --- a/tests/statespace/models/structural/components/test_seasonality.py +++ b/tests/statespace/models/structural/components/test_seasonality.py @@ -474,6 +474,39 @@ def test_frequency_seasonality_multiple_observed(rng): np.testing.assert_allclose(Q_diag, expected_Q_diag, atol=ATOL, rtol=RTOL) +def test_frequency_seasonality_multivariate_shared_states(): + mod = st.FrequencySeasonality( + season_length=4, + n=1, + name="season", + innovations=True, + observed_state_names=["data_1", "data_2"], + share_states=True, + ) + + assert mod.k_endog == 2 + assert mod.k_states == 2 + assert mod.k_posdef == 2 + + assert mod.state_names == ["Cos_0_season[shared]", "Sin_0_season[shared]"] + assert mod.shock_names == ["Cos_0_season[shared]", "Sin_0_season[shared]"] + + assert mod.coords["state_season"] == ["Cos_0_season", "Sin_0_season"] + + Z, T, R = pytensor.function( + [], [mod.ssm["design"], mod.ssm["transition"], mod.ssm["selection"]], mode="FAST_COMPILE" + )() + + np.testing.assert_allclose(np.array([[1.0, 0.0], [1.0, 0.0]]), Z) + + np.testing.assert_allclose(np.array([[1.0, 0.0], [0.0, 1.0]]), R) + + lam = 2 * np.pi * 1 / 4 + np.testing.assert_allclose( + np.array([[np.cos(lam), np.sin(lam)], [-np.sin(lam), np.cos(lam)]]), T + ) + + def test_add_two_frequency_seasonality_different_observed(rng): mod1 = st.FrequencySeasonality( season_length=4, @@ -561,6 +594,42 @@ def test_add_two_frequency_seasonality_different_observed(rng): np.testing.assert_allclose(expected_T, T_v, atol=ATOL, rtol=RTOL) +def test_add_frequency_seasonality_shared_and_not_shared(): + shared_season = st.FrequencySeasonality( + season_length=4, + n=1, + name="shared_season", + innovations=True, + observed_state_names=["data_1", "data_2"], + share_states=True, + ) + + individual_season = st.FrequencySeasonality( + season_length=4, + n=2, + name="individual_season", + innovations=True, + observed_state_names=["data_1", "data_2"], + share_states=False, + ) + + mod = (shared_season + individual_season).build(verbose=False) + + assert mod.k_endog == 2 + assert mod.k_states == 10 + assert mod.k_posdef == 10 + + assert mod.coords["state_shared_season"] == [ + "Cos_0_shared_season", + "Sin_0_shared_season", + ] + assert mod.coords["state_individual_season"] == [ + "Cos_0_individual_season", + "Sin_0_individual_season", + "Cos_1_individual_season", + ] + + @pytest.mark.parametrize( "test_case", [ From bf18a3ebd8a9d0374764670bc62d5d0d808eec29 Mon Sep 17 00:00:00 2001 From: Jonathan Dekermanjian Date: Tue, 5 Aug 2025 05:09:28 -0600 Subject: [PATCH 05/10] Add shared_state argument to RegressionComponent --- .../structural/components/regression.py | 64 +++++++---- .../structural/components/test_regression.py | 102 ++++++++++++++++++ 2 files changed, 148 insertions(+), 18 deletions(-) diff --git a/pymc_extras/statespace/models/structural/components/regression.py b/pymc_extras/statespace/models/structural/components/regression.py index 89d26018..bd7ad7fa 100644 --- a/pymc_extras/statespace/models/structural/components/regression.py +++ b/pymc_extras/statespace/models/structural/components/regression.py @@ -31,6 +31,11 @@ class RegressionComponent(Component): Whether to include stochastic innovations in the regression coefficients, allowing them to vary over time. If True, coefficients follow a random walk. + share_states: bool, default False + Whether latent states are shared across the observed states. If True, there will be only one set of latent + states, which are observed by all observed states. If False, each observed state has its own set of + latent states. + Notes ----- This component implements regression with exogenous variables in a structural time series @@ -107,7 +112,10 @@ def __init__( state_names: list[str] | None = None, observed_state_names: list[str] | None = None, innovations=False, + share_states: bool = False, ): + self.share_states = share_states + if observed_state_names is None: observed_state_names = ["data"] @@ -121,8 +129,8 @@ def __init__( super().__init__( name=name, k_endog=k_endog, - k_states=k_states * k_endog, - k_posdef=k_posdef * k_endog, + k_states=k_states * k_endog if not share_states else k_states, + k_posdef=k_posdef * k_endog if not share_states else k_posdef, state_names=self.state_names, observed_state_names=observed_state_names, measurement_error=False, @@ -153,10 +161,12 @@ def _handle_input_data(self, k_exog: int, state_names: list[str] | None, name) - def make_symbolic_graph(self) -> None: k_endog = self.k_endog - k_states = self.k_states // k_endog + k_endog_effective = 1 if self.share_states else k_endog + + k_states = self.k_states // k_endog_effective betas = self.make_and_register_variable( - f"beta_{self.name}", shape=(k_endog, k_states) if k_endog > 1 else (k_states,) + f"beta_{self.name}", shape=(k_endog, k_states) if k_endog_effective > 1 else (k_states,) ) regression_data = self.make_and_register_data(f"data_{self.name}", shape=(None, k_states)) @@ -164,43 +174,61 @@ def make_symbolic_graph(self) -> None: self.ssm["transition", :, :] = pt.eye(self.k_states) self.ssm["selection", :, :] = pt.eye(self.k_states) - Z = pt.linalg.block_diag(*[pt.expand_dims(regression_data, 1) for _ in range(k_endog)]) - self.ssm["design"] = pt.specify_shape( - Z, (None, k_endog, regression_data.type.shape[1] * k_endog) - ) + if self.share_states: + self.ssm["design"] = pt.specify_shape( + pt.join(1, *[pt.expand_dims(regression_data, 1) for _ in range(k_endog)]), + (None, k_endog, self.k_states), + ) + else: + Z = pt.linalg.block_diag(*[pt.expand_dims(regression_data, 1) for _ in range(k_endog)]) + self.ssm["design"] = pt.specify_shape( + Z, (None, k_endog, regression_data.type.shape[1] * k_endog) + ) if self.innovations: sigma_beta = self.make_and_register_variable( - f"sigma_beta_{self.name}", (k_states,) if k_endog == 1 else (k_endog, k_states) + f"sigma_beta_{self.name}", + (k_states,) if k_endog_effective == 1 else (k_endog, k_states), ) row_idx, col_idx = np.diag_indices(self.k_states) self.ssm["state_cov", row_idx, col_idx] = sigma_beta.ravel() ** 2 def populate_component_properties(self) -> None: k_endog = self.k_endog - k_states = self.k_states // k_endog + k_endog_effective = 1 if self.share_states else k_endog + + k_states = self.k_states // k_endog_effective - self.shock_names = self.state_names + if self.share_states: + self.shock_names = [f"{state_name}_shared" for state_name in self.state_names] + else: + self.shock_names = self.state_names self.param_names = [f"beta_{self.name}"] self.data_names = [f"data_{self.name}"] self.param_dims = { f"beta_{self.name}": (f"endog_{self.name}", f"state_{self.name}") - if k_endog > 1 + if k_endog_effective > 1 else (f"state_{self.name}",) } base_names = self.state_names - self.state_names = [ - f"{name}[{obs_name}]" for obs_name in self.observed_state_names for name in base_names - ] + + if self.share_states: + self.state_names = [f"{name}[{self.name}_shared]" for name in base_names] + else: + self.state_names = [ + f"{name}[{obs_name}]" + for obs_name in self.observed_state_names + for name in base_names + ] self.param_info = { f"beta_{self.name}": { - "shape": (k_endog, k_states) if k_endog > 1 else (k_states,), + "shape": (k_endog_effective, k_states) if k_endog_effective > 1 else (k_states,), "constraints": None, "dims": (f"endog_{self.name}", f"state_{self.name}") - if k_endog > 1 + if k_endog_effective > 1 else (f"state_{self.name}",), }, } @@ -223,6 +251,6 @@ def populate_component_properties(self) -> None: "shape": (k_states,), "constraints": "Positive", "dims": (f"state_{self.name}",) - if k_endog == 1 + if k_endog_effective == 1 else (f"endog_{self.name}", f"state_{self.name}"), } diff --git a/tests/statespace/models/structural/components/test_regression.py b/tests/statespace/models/structural/components/test_regression.py index e7208923..1855bb4a 100644 --- a/tests/statespace/models/structural/components/test_regression.py +++ b/tests/statespace/models/structural/components/test_regression.py @@ -7,6 +7,7 @@ from pytensor import config from pytensor import tensor as pt from pytensor.graph.basic import explicit_graph_inputs +from scipy.linalg import block_diag from pymc_extras.statespace.models import structural as st from tests.statespace.models.structural.conftest import _assert_basic_coords_correct @@ -235,3 +236,104 @@ def test_filter_scans_time_varying_design_matrix(self, rng, time_series_data, in if innovations: # Check that sigma_beta parameter is included in the prior assert "sigma_beta_exog" in prior.prior.data_vars + + +def test_regression_multiple_shared_construction(): + rc = st.RegressionComponent( + state_names=["A"], + observed_state_names=["data_1", "data_2"], + innovations=True, + share_states=True, + ) + mod = rc.build(verbose=False) + + assert mod.k_endog == 2 + assert mod.k_states == 1 + assert mod.k_posdef == 1 + + assert mod.coords["state_regression"] == ["A"] + assert mod.coords["endog_regression"] == ["data_1", "data_2"] + + assert mod.state_names == [ + "A[regression_shared]", + ] + + assert mod.shock_names == ["A_shared"] + + data = np.random.standard_normal(size=(10, 1)) + Z = mod.ssm["design"].eval({"data_regression": data}) + T = mod.ssm["transition"].eval() + R = mod.ssm["selection"].eval() + + np.testing.assert_allclose( + Z, + np.hstack( + [ + data, + data, + ] + )[:, :, np.newaxis], + ) + + np.testing.assert_allclose(T, np.array([[1.0]])) + np.testing.assert_allclose(R, np.array([[1.0]])) + + +def test_regression_multiple_shared_observed(rng): + mod = st.RegressionComponent( + state_names=["A"], + observed_state_names=["data_1", "data_2", "data_3"], + innovations=False, + share_states=True, + ) + data = np.random.standard_normal(size=(10, 1)) + + params = {"beta_regression": np.array([1.0])} + data_dict = {"data_regression": data} + x, y = simulate_from_numpy_model(mod, rng, params, data_dict, steps=data.shape[0]) + np.testing.assert_allclose(y[:, 0], y[:, 1]) + np.testing.assert_allclose(y[:, 0], y[:, 2]) + + +def test_regression_mixed_shared_and_not_shared(): + mod_1 = st.RegressionComponent( + name="individual", + state_names=["A"], + observed_state_names=["data_1", "data_2"], + ) + mod_2 = st.RegressionComponent( + name="joint", + state_names=["B", "C"], + observed_state_names=["data_1", "data_2"], + share_states=True, + ) + + mod = (mod_1 + mod_2).build(verbose=False) + + assert mod.k_endog == 2 + assert mod.k_states == 4 + assert mod.k_posdef == 4 + + assert mod.state_names == ["A[data_1]", "A[data_2]", "B[joint_shared]", "C[joint_shared]"] + assert mod.shock_names == ["A", "B_shared", "C_shared"] + + data_joint = np.random.standard_normal(size=(10, 2)) + data_individual = np.random.standard_normal(size=(10, 1)) + Z = mod.ssm["design"].eval({"data_joint": data_joint, "data_individual": data_individual}) + T = mod.ssm["transition"].eval() + R = mod.ssm["selection"].eval() + + np.testing.assert_allclose( + Z, + np.concat( + ( + block_diag(*[data_individual[:, np.newaxis] for _ in range(mod.k_endog)]), + np.concat((data_joint[:, np.newaxis], data_joint[:, np.newaxis]), axis=1), + ), + axis=2, + ), + ) + + np.testing.assert_allclose(T, np.eye(mod.k_states)) + + np.testing.assert_allclose(R, np.eye(mod.k_states)) From 7a8bdf1fe879c3b49f1f27c7ba52ebf6c5c9630d Mon Sep 17 00:00:00 2001 From: Jesse Grabowski Date: Tue, 5 Aug 2025 19:19:35 +0800 Subject: [PATCH 06/10] Add shared_state argument to AutoregressiveComponent --- .../structural/components/autoregressive.py | 68 ++++--- .../components/test_autoregressive.py | 175 ++++++++++++++++-- .../structural/test_against_statsmodels.py | 2 +- 3 files changed, 202 insertions(+), 43 deletions(-) diff --git a/pymc_extras/statespace/models/structural/components/autoregressive.py b/pymc_extras/statespace/models/structural/components/autoregressive.py index d522ea1f..105bcd81 100644 --- a/pymc_extras/statespace/models/structural/components/autoregressive.py +++ b/pymc_extras/statespace/models/structural/components/autoregressive.py @@ -23,6 +23,11 @@ class AutoregressiveComponent(Component): observed_state_names: list[str] | None, default None List of strings for observed state labels. If None, defaults to ["data"]. + share_states: bool, default False + Whether latent states are shared across the observed states. If True, there will be only one set of latent + states, which are observed by all observed states. If False, each observed state has its own set of + latent states. This argument has no effect if `k_endog` is 1. + Notes ----- An autoregressive component can be thought of as a way o introducing serially correlated errors into the model. @@ -73,45 +78,58 @@ def __init__( order: int = 1, name: str = "auto_regressive", observed_state_names: list[str] | None = None, + share_states: bool = False, ): if observed_state_names is None: observed_state_names = ["data"] - k_posdef = k_endog = len(observed_state_names) + k_endog = len(observed_state_names) + k_endog_effective = k_posdef = 1 if share_states else k_endog order = order_to_mask(order) ar_lags = np.flatnonzero(order).ravel().astype(int) + 1 k_states = len(order) + self.share_states = share_states self.order = order self.ar_lags = ar_lags super().__init__( name=name, k_endog=k_endog, - k_states=k_states * k_endog, + k_states=k_states * k_endog_effective, k_posdef=k_posdef, measurement_error=True, combine_hidden_states=True, observed_state_names=observed_state_names, - obs_state_idxs=np.tile(np.r_[[1.0], np.zeros(k_states - 1)], k_endog), + obs_state_idxs=np.tile(np.r_[[1.0], np.zeros(k_states - 1)], k_endog_effective), ) def populate_component_properties(self): - k_states = self.k_states // self.k_endog # this is also the number of AR lags + k_endog = self.k_endog + k_endog_effective = 1 if self.share_states else k_endog - self.state_names = [ - f"L{i + 1}[{state_name}]" - for state_name in self.observed_state_names - for i in range(k_states) - ] + k_states = self.k_states // k_endog_effective # this is also the number of AR lags + base_names = [f"L{i + 1}_{self.name}" for i in range(k_states)] + + if self.share_states: + self.state_names = [f"{name}[shared]" for name in base_names] + self.shock_names = [f"{self.name}[shared]"] + else: + self.state_names = [ + f"{name}[{state_name}]" + for state_name in self.observed_state_names + for name in base_names + ] + self.shock_names = [ + f"{self.name}[{obs_name}]" for obs_name in self.observed_state_names + ] - self.shock_names = [f"{self.name}[{obs_name}]" for obs_name in self.observed_state_names] self.param_names = [f"params_{self.name}", f"sigma_{self.name}"] self.param_dims = {f"params_{self.name}": (f"lag_{self.name}",)} self.coords = {f"lag_{self.name}": self.ar_lags.tolist()} - if self.k_endog > 1: + if k_endog_effective > 1: self.param_dims[f"params_{self.name}"] = ( f"endog_{self.name}", f"lag_{self.name}", @@ -140,18 +158,21 @@ def populate_component_properties(self): def make_symbolic_graph(self) -> None: k_endog = self.k_endog - k_states = self.k_states // k_endog + k_endog_effective = 1 if self.share_states else k_endog + + k_states = self.k_states // k_endog_effective k_posdef = self.k_posdef k_nonzero = int(sum(self.order)) ar_params = self.make_and_register_variable( - f"params_{self.name}", shape=(k_nonzero,) if k_endog == 1 else (k_endog, k_nonzero) + f"params_{self.name}", + shape=(k_nonzero,) if k_endog_effective == 1 else (k_endog_effective, k_nonzero), ) sigma_ar = self.make_and_register_variable( - f"sigma_{self.name}", shape=() if k_endog == 1 else (k_endog,) + f"sigma_{self.name}", shape=() if k_endog_effective == 1 else (k_endog_effective,) ) - if k_endog == 1: + if k_endog_effective == 1: T = pt.eye(k_states, k=-1) ar_idx = (np.zeros(k_nonzero, dtype="int"), np.nonzero(self.order)[0]) T = T[ar_idx].set(ar_params) @@ -159,7 +180,7 @@ def make_symbolic_graph(self) -> None: else: transition_matrices = [] - for i in range(k_endog): + for i in range(k_endog_effective): T = pt.eye(k_states, k=-1) ar_idx = (np.zeros(k_nonzero, dtype="int"), np.nonzero(self.order)[0]) T = T[ar_idx].set(ar_params[i]) @@ -171,18 +192,21 @@ def make_symbolic_graph(self) -> None: self.ssm["transition", :, :] = T R = np.eye(k_states) - R_mask = np.full((k_states), False) + R_mask = np.full((k_states,), False) R_mask[0] = True R = R[:, R_mask] self.ssm["selection", :, :] = pt.specify_shape( - pt.linalg.block_diag(*[R for _ in range(k_endog)]), (self.k_states, self.k_posdef) + pt.linalg.block_diag(*[R for _ in range(k_endog_effective)]), (self.k_states, k_posdef) ) - Z = pt.zeros((1, k_states))[0, 0].set(1.0) - self.ssm["design", :, :] = pt.specify_shape( - pt.linalg.block_diag(*[Z for _ in range(k_endog)]), (self.k_endog, self.k_states) - ) + Zs = [pt.zeros((1, k_states))[0, 0].set(1.0) for _ in range(k_endog)] + + if self.share_states: + Z = pt.join(0, *Zs) + else: + Z = pt.linalg.block_diag(*Zs) + self.ssm["design", :, :] = pt.specify_shape(Z, (k_endog, self.k_states)) cov_idx = ("state_cov", *np.diag_indices(k_posdef)) self.ssm[cov_idx] = sigma_ar**2 diff --git a/tests/statespace/models/structural/components/test_autoregressive.py b/tests/statespace/models/structural/components/test_autoregressive.py index 9eabbe17..8f05fe96 100644 --- a/tests/statespace/models/structural/components/test_autoregressive.py +++ b/tests/statespace/models/structural/components/test_autoregressive.py @@ -15,7 +15,6 @@ def test_autoregressive_model(order, rng): ar = st.AutoregressiveComponent(order=order).build(verbose=False) - # Check coords _assert_basic_coords_correct(ar) lags = np.arange(len(order) if isinstance(order, list) else order, dtype="int") + 1 @@ -25,7 +24,7 @@ def test_autoregressive_model(order, rng): def test_autoregressive_multiple_observed_build(rng): - ar = st.AutoregressiveComponent(order=3, observed_state_names=["data_1", "data_2"]) + ar = st.AutoregressiveComponent(order=3, name="ar", observed_state_names=["data_1", "data_2"]) mod = ar.build(verbose=False) assert mod.k_endog == 2 @@ -33,18 +32,18 @@ def test_autoregressive_multiple_observed_build(rng): assert mod.k_posdef == 2 assert mod.state_names == [ - "L1[data_1]", - "L2[data_1]", - "L3[data_1]", - "L1[data_2]", - "L2[data_2]", - "L3[data_2]", + "L1_ar[data_1]", + "L2_ar[data_1]", + "L3_ar[data_1]", + "L1_ar[data_2]", + "L2_ar[data_2]", + "L3_ar[data_2]", ] - assert mod.shock_names == ["auto_regressive[data_1]", "auto_regressive[data_2]"] + assert mod.shock_names == ["ar[data_1]", "ar[data_2]"] params = { - "params_auto_regressive": np.full( + "params_ar": np.full( ( 2, sum(ar.order), @@ -52,7 +51,7 @@ def test_autoregressive_multiple_observed_build(rng): 0.5, dtype=config.floatX, ), - "sigma_auto_regressive": np.array([0.05, 0.12]), + "sigma_ar": np.array([0.05, 0.12]), } _, _, _, _, T, Z, R, _, Q = mod._unpack_statespace_with_placeholders() input_vars = explicit_graph_inputs([T, Z, R, Q]) @@ -89,6 +88,33 @@ def test_autoregressive_multiple_observed_build(rng): np.testing.assert_allclose(Q, np.diag([0.05**2, 0.12**2])) +def test_autoregressive_multiple_observed_shared(): + ar = st.AutoregressiveComponent( + order=1, + name="latent", + observed_state_names=["data_1", "data_2", "data_3"], + share_states=True, + ) + mod = ar.build(verbose=False) + + assert mod.k_endog == 3 + assert mod.k_states == 1 + assert mod.k_posdef == 1 + + assert mod.state_names == ["L1_latent[shared]"] + assert mod.shock_names == ["latent[shared]"] + assert mod.coords["lag_latent"] == [1] + assert "endog_latent" not in mod.coords + + outputs = [mod.ssm["transition"], mod.ssm["design"]] + params = {"params_latent": np.array([0.9])} + T, Z = pytensor.function(list(explicit_graph_inputs(outputs)), outputs)(**params) + + np.testing.assert_allclose(np.array([[1.0], [1.0], [1.0]]), Z) + + np.testing.assert_allclose(np.array([[0.9]]), T) + + def test_autoregressive_multiple_observed_data(rng): ar = st.AutoregressiveComponent(order=1, observed_state_names=["data_1", "data_2", "data_3"]) mod = ar.build(verbose=False) @@ -112,21 +138,130 @@ def test_add_autoregressive_different_observed(): mod = (mod_1 + mod_2).build(verbose=False) - print(mod.coords) - assert mod.k_endog == 2 assert mod.k_states == 7 assert mod.k_posdef == 2 assert mod.state_names == [ - "L1[data_1]", - "L1[data_2]", - "L2[data_2]", - "L3[data_2]", - "L4[data_2]", - "L5[data_2]", - "L6[data_2]", + f"L1_{mod_1.name}[data_1]", + f"L1_{mod_2.name}[data_2]", + f"L2_{mod_2.name}[data_2]", + f"L3_{mod_2.name}[data_2]", + f"L4_{mod_2.name}[data_2]", + f"L5_{mod_2.name}[data_2]", + f"L6_{mod_2.name}[data_2]", ] assert mod.shock_names == ["ar1[data_1]", "ar6[data_2]"] assert mod.coords["lag_ar1"] == [1] assert mod.coords["lag_ar6"] == [1, 2, 3, 4, 5, 6] + + +def test_autoregressive_shared_and_not_shared(): + shared = st.AutoregressiveComponent( + order=3, + name="shared_ar", + observed_state_names=["data_1", "data_2", "data_3"], + share_states=True, + ) + individual = st.AutoregressiveComponent( + order=3, + name="individual_ar", + observed_state_names=["data_1", "data_2", "data_3"], + share_states=False, + ) + + mod = (shared + individual).build(verbose=False) + + assert mod.k_endog == 3 + assert mod.k_states == 3 + 3 * 3 + assert mod.k_posdef == 4 + + assert mod.state_names == [ + "L1_shared_ar[shared]", + "L2_shared_ar[shared]", + "L3_shared_ar[shared]", + "L1_individual_ar[data_1]", + "L2_individual_ar[data_1]", + "L3_individual_ar[data_1]", + "L1_individual_ar[data_2]", + "L2_individual_ar[data_2]", + "L3_individual_ar[data_2]", + "L1_individual_ar[data_3]", + "L2_individual_ar[data_3]", + "L3_individual_ar[data_3]", + ] + + assert mod.shock_names == [ + "shared_ar[shared]", + "individual_ar[data_1]", + "individual_ar[data_2]", + "individual_ar[data_3]", + ] + assert mod.coords["lag_shared_ar"] == [1, 2, 3] + assert mod.coords["lag_individual_ar"] == [1, 2, 3] + + outputs = [mod.ssm["transition"], mod.ssm["design"], mod.ssm["selection"], mod.ssm["state_cov"]] + T, Z, R, Q = pytensor.function( + list(explicit_graph_inputs(outputs)), + outputs, + )( + **{ + "params_shared_ar": np.array([0.9, 0.8, 0.7]), + "params_individual_ar": np.full((3, 3), 0.5), + "sigma_shared_ar": np.array(0.1), + "sigma_individual_ar": np.array([0.05, 0.12, 0.22]), + } + ) + + np.testing.assert_allclose( + T, + np.array( + [ + [0.9, 0.8, 0.7, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.5], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0], + ] + ), + ) + + np.testing.assert_allclose( + Z, + np.array( + [ + [1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0], + ] + ), + ) + + np.testing.assert_allclose( + R, + np.array( + [ + [1.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0], + [0.0, 1.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.0], + [0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 1.0], + [0.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 0.0, 0.0], + ] + ), + ) + + np.testing.assert_allclose(Q, np.diag([0.1, 0.05, 0.12, 0.22]) ** 2) diff --git a/tests/statespace/models/structural/test_against_statsmodels.py b/tests/statespace/models/structural/test_against_statsmodels.py index 31b4cbae..4ac3dee3 100644 --- a/tests/statespace/models/structural/test_against_statsmodels.py +++ b/tests/statespace/models/structural/test_against_statsmodels.py @@ -404,7 +404,7 @@ def create_structural_model_and_equivalent_statsmodel( components.append(comp) if autoregressive is not None: - ar_names = [f"L{i + 1}" for i in range(autoregressive)] + ar_names = [f"L{i + 1}_ar" for i in range(autoregressive)] params_ar = rng.normal(size=(autoregressive,)).astype(floatX) if autoregressive == 1: params_ar = params_ar.item() From 1341c527f55bf158a7da002aa071e1603ea54824 Mon Sep 17 00:00:00 2001 From: Jesse Grabowski Date: Tue, 5 Aug 2025 20:23:51 +0800 Subject: [PATCH 07/10] Add shared_state argument to MeasurementError --- .../components/measurement_error.py | 26 +++++++++--- .../components/test_measurement_error.py | 42 +++++++++++++++++++ 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/pymc_extras/statespace/models/structural/components/measurement_error.py b/pymc_extras/statespace/models/structural/components/measurement_error.py index babac032..a6eb77a5 100644 --- a/pymc_extras/statespace/models/structural/components/measurement_error.py +++ b/pymc_extras/statespace/models/structural/components/measurement_error.py @@ -17,6 +17,10 @@ class MeasurementError(Component): Name of the measurement error component. Default is "MeasurementError". observed_state_names : list[str] | None, optional Names of the observed variables. If None, defaults to ["data"]. + share_states: bool, default False + Whether latent states are shared across the observed states. If True, there will be only one set of latent + states, which are observed by all observed states. If False, each observed state has its own set of + latent states. This argument has no effect if `k_endog` is 1. Notes ----- @@ -93,11 +97,16 @@ class MeasurementError(Component): """ def __init__( - self, name: str = "MeasurementError", observed_state_names: list[str] | None = None + self, + name: str = "MeasurementError", + observed_state_names: list[str] | None = None, + share_states: bool = False, ): if observed_state_names is None: observed_state_names = ["data"] + self.share_states = share_states + k_endog = len(observed_state_names) k_states = 0 k_posdef = 0 @@ -113,25 +122,32 @@ def __init__( ) def populate_component_properties(self): + k_endog = self.k_endog + k_endog_effective = 1 if self.share_states else k_endog + self.param_names = [f"sigma_{self.name}"] self.param_dims = {} self.coords = {} - if self.k_endog > 1: + if k_endog_effective > 1: self.param_dims[f"sigma_{self.name}"] = (f"endog_{self.name}",) self.coords[f"endog_{self.name}"] = self.observed_state_names self.param_info = { f"sigma_{self.name}": { - "shape": (self.k_endog,) if self.k_endog > 1 else (), + "shape": (k_endog_effective,) if k_endog_effective > 1 else (), "constraints": "Positive", - "dims": (f"endog_{self.name}",) if self.k_endog > 1 else None, + "dims": (f"endog_{self.name}",) if k_endog_effective > 1 else None, } } def make_symbolic_graph(self) -> None: - sigma_shape = () if self.k_endog == 1 else (self.k_endog,) + k_endog = self.k_endog + k_endog_effective = 1 if self.share_states else k_endog + + sigma_shape = () if k_endog_effective == 1 else (k_endog_effective,) error_sigma = self.make_and_register_variable(f"sigma_{self.name}", shape=sigma_shape) + diag_idx = np.diag_indices(self.k_endog) idx = np.s_["obs_cov", diag_idx[0], diag_idx[1]] self.ssm[idx] = error_sigma**2 diff --git a/tests/statespace/models/structural/components/test_measurement_error.py b/tests/statespace/models/structural/components/test_measurement_error.py index ba6a654f..a20d56ad 100644 --- a/tests/statespace/models/structural/components/test_measurement_error.py +++ b/tests/statespace/models/structural/components/test_measurement_error.py @@ -1,4 +1,7 @@ import numpy as np +import pytensor + +from pytensor.graph.basic import explicit_graph_inputs from pymc_extras.statespace.models import structural as st from tests.statespace.models.structural.conftest import _assert_basic_coords_correct @@ -19,6 +22,45 @@ def test_measurement_error_multiple_observed(): assert mod.param_dims["sigma_obs"] == ("endog_obs",) +def test_measurement_error_share_states(): + mod = st.MeasurementError("obs", observed_state_names=["data_1", "data_2"], share_states=True) + mod.build(verbose=False) + + assert mod.k_endog == 2 + assert mod.param_names == ["sigma_obs", "P0"] + assert "endog_obs" not in mod.coords + + # Check that the parameter is shared across the observed states + assert mod.param_info["sigma_obs"]["shape"] == () + + outputs = mod.ssm["obs_cov"] + + H = pytensor.function(list(explicit_graph_inputs([outputs])), outputs)(sigma_obs=np.array(0.5)) + np.testing.assert_allclose(H, np.diag([0.5, 0.5]) ** 2) + + +def test_measurement_error_shared_and_not_shared(): + shared = st.MeasurementError( + "error_shared", observed_state_names=["data_1", "data_2"], share_states=True + ) + individual = st.MeasurementError("error_individual", observed_state_names=["data_1", "data_2"]) + mod = (shared + individual).build(verbose=False) + + assert mod.k_endog == 2 + assert mod.param_names == ["sigma_error_shared", "sigma_error_individual", "P0"] + assert mod.coords["endog_error_individual"] == ["data_1", "data_2"] + + assert mod.param_info["sigma_error_shared"]["shape"] == () + assert mod.param_info["sigma_error_individual"]["shape"] == (2,) + + outputs = mod.ssm["obs_cov"] + + H = pytensor.function(list(explicit_graph_inputs([outputs])), outputs)( + sigma_error_shared=np.array(0.5), sigma_error_individual=np.array([0.1, 0.9]) + ) + np.testing.assert_allclose(H, np.diag([0.5, 0.5]) ** 2 + np.diag([0.1, 0.9]) ** 2) + + def test_build_with_measurement_error_subset(): ll = st.LevelTrendComponent(order=2, observed_state_names=["data_1", "data_2", "data_3"]) me = st.MeasurementError("obs", observed_state_names=["data_1", "data_3"]) From 2db126013a7a70f7c7785e64380d62c47e90cdf9 Mon Sep 17 00:00:00 2001 From: Alexandre Andorra Date: Tue, 5 Aug 2025 15:37:51 -0400 Subject: [PATCH 08/10] Pass `share_states` to `super` calls --- .../statespace/models/structural/components/autoregressive.py | 1 + pymc_extras/statespace/models/structural/components/cycle.py | 1 + .../statespace/models/structural/components/level_trend.py | 1 + .../models/structural/components/measurement_error.py | 1 + .../statespace/models/structural/components/regression.py | 1 + .../statespace/models/structural/components/seasonality.py | 2 ++ 6 files changed, 7 insertions(+) diff --git a/pymc_extras/statespace/models/structural/components/autoregressive.py b/pymc_extras/statespace/models/structural/components/autoregressive.py index 105bcd81..700b9d87 100644 --- a/pymc_extras/statespace/models/structural/components/autoregressive.py +++ b/pymc_extras/statespace/models/structural/components/autoregressive.py @@ -103,6 +103,7 @@ def __init__( combine_hidden_states=True, observed_state_names=observed_state_names, obs_state_idxs=np.tile(np.r_[[1.0], np.zeros(k_states - 1)], k_endog_effective), + share_states=share_states, ) def populate_component_properties(self): diff --git a/pymc_extras/statespace/models/structural/components/cycle.py b/pymc_extras/statespace/models/structural/components/cycle.py index 19f38334..e4a25075 100644 --- a/pymc_extras/statespace/models/structural/components/cycle.py +++ b/pymc_extras/statespace/models/structural/components/cycle.py @@ -197,6 +197,7 @@ def __init__( combine_hidden_states=True, obs_state_idxs=obs_state_idx, observed_state_names=observed_state_names, + share_states=share_states, ) def make_symbolic_graph(self) -> None: diff --git a/pymc_extras/statespace/models/structural/components/level_trend.py b/pymc_extras/statespace/models/structural/components/level_trend.py index 95987ea9..edf24f5c 100644 --- a/pymc_extras/statespace/models/structural/components/level_trend.py +++ b/pymc_extras/statespace/models/structural/components/level_trend.py @@ -170,6 +170,7 @@ def __init__( obs_state_idxs=np.tile( np.array([1.0] + [0.0] * (k_states - 1)), k_endog if not share_states else 1 ), + share_states=share_states, ) def populate_component_properties(self): diff --git a/pymc_extras/statespace/models/structural/components/measurement_error.py b/pymc_extras/statespace/models/structural/components/measurement_error.py index a6eb77a5..003dfc9f 100644 --- a/pymc_extras/statespace/models/structural/components/measurement_error.py +++ b/pymc_extras/statespace/models/structural/components/measurement_error.py @@ -119,6 +119,7 @@ def __init__( measurement_error=True, combine_hidden_states=False, observed_state_names=observed_state_names, + share_states=share_states, ) def populate_component_properties(self): diff --git a/pymc_extras/statespace/models/structural/components/regression.py b/pymc_extras/statespace/models/structural/components/regression.py index bd7ad7fa..5620b1ea 100644 --- a/pymc_extras/statespace/models/structural/components/regression.py +++ b/pymc_extras/statespace/models/structural/components/regression.py @@ -132,6 +132,7 @@ def __init__( k_states=k_states * k_endog if not share_states else k_states, k_posdef=k_posdef * k_endog if not share_states else k_posdef, state_names=self.state_names, + share_states=share_states, observed_state_names=observed_state_names, measurement_error=False, combine_hidden_states=False, diff --git a/pymc_extras/statespace/models/structural/components/seasonality.py b/pymc_extras/statespace/models/structural/components/seasonality.py index f068f744..ab0de81f 100644 --- a/pymc_extras/statespace/models/structural/components/seasonality.py +++ b/pymc_extras/statespace/models/structural/components/seasonality.py @@ -296,6 +296,7 @@ def __init__( obs_state_idxs=np.tile( np.array([1.0] + [0.0] * (k_states - 1)), 1 if share_states else k_endog ), + share_states=share_states, ) def populate_component_properties(self): @@ -525,6 +526,7 @@ def __init__( k_posdef=k_states * int(self.innovations) if share_states else k_states * int(self.innovations) * k_endog, + share_states=share_states, observed_state_names=observed_state_names, measurement_error=False, combine_hidden_states=True, From 753823cffb0c75ffdfcfc85048d4c5b25e97ebe2 Mon Sep 17 00:00:00 2001 From: Alexandre Andorra Date: Tue, 5 Aug 2025 16:43:53 -0400 Subject: [PATCH 09/10] Add shared_states flag in core.py --- pymc_extras/statespace/models/structural/core.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pymc_extras/statespace/models/structural/core.py b/pymc_extras/statespace/models/structural/core.py index 95bc957c..a2718251 100644 --- a/pymc_extras/statespace/models/structural/core.py +++ b/pymc_extras/statespace/models/structural/core.py @@ -471,6 +471,10 @@ class Component: obs_state_idxs : np.ndarray | None, optional Indices indicating which states contribute to observed variables. If None, defaults to None. + share_states : bool, optional + Whether states are shared across multiple endogenous variables in multivariate + models. When True, the same latent states affect all observed variables. + Default is False. Examples -------- @@ -512,10 +516,12 @@ def __init__( combine_hidden_states=True, component_from_sum=False, obs_state_idxs=None, + share_states: bool = False, ): self.name = name self.k_endog = k_endog self.k_states = k_states + self.share_states = share_states self.k_posdef = k_posdef self.measurement_error = measurement_error @@ -557,6 +563,7 @@ def __init__( "observed_state_names": self.observed_state_names, "combine_hidden_states": combine_hidden_states, "obs_state_idx": obs_state_idxs, + "share_states": self.share_states, } } From 4dcfc99dfe8ddd4e99c037f7486e04cb704d4de8 Mon Sep 17 00:00:00 2001 From: Alexandre Andorra Date: Tue, 5 Aug 2025 16:44:19 -0400 Subject: [PATCH 10/10] Fix cycle tests after param renaming --- .../models/structural/components/test_cycle.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/statespace/models/structural/components/test_cycle.py b/tests/statespace/models/structural/components/test_cycle.py index 6b2099f3..2371ab59 100644 --- a/tests/statespace/models/structural/components/test_cycle.py +++ b/tests/statespace/models/structural/components/test_cycle.py @@ -122,9 +122,9 @@ def test_multivariate_cycle_with_shared(rng): assert cycle.state_names == ["Cos_cycle[shared]", "Sin_cycle[shared]"] assert cycle.shock_names == [] - assert cycle.param_names == ["cycle"] + assert cycle.param_names == ["params_cycle"] - params = {"cycle": np.array([1.0, 2.0], dtype=config.floatX)} + params = {"params_cycle": np.array([1.0, 2.0], dtype=config.floatX)} x, y = simulate_from_numpy_model(cycle, rng, params, steps=12 * 12) np.testing.assert_allclose(y[:, 0], y[:, 1], atol=ATOL, rtol=RTOL) @@ -351,9 +351,9 @@ def test_add_multivariate_shared_and_not_shared(): assert mod.shock_names == expected_states[:2] assert mod.param_names == [ - "shared_cycle", + "params_shared_cycle", "sigma_shared_cycle", - "individual_cycle", + "params_individual_cycle", "length_individual_cycle", "dampening_factor_individual_cycle", "P0", @@ -364,17 +364,17 @@ def test_add_multivariate_shared_and_not_shared(): assert mod.coords["state_individual_cycle"] == ["Cos_individual_cycle", "Sin_individual_cycle"] assert mod.coords["endog_individual_cycle"] == ["gdp", "inflation", "unemployment"] - assert mod.param_info["shared_cycle"]["dims"] == ("state_shared_cycle",) - assert mod.param_info["shared_cycle"]["shape"] == (2,) + assert mod.param_info["params_shared_cycle"]["dims"] == ("state_shared_cycle",) + assert mod.param_info["params_shared_cycle"]["shape"] == (2,) assert mod.param_info["sigma_shared_cycle"]["dims"] is None assert mod.param_info["sigma_shared_cycle"]["shape"] == () - assert mod.param_info["individual_cycle"]["dims"] == ( + assert mod.param_info["params_individual_cycle"]["dims"] == ( "endog_individual_cycle", "state_individual_cycle", ) - assert mod.param_info["individual_cycle"]["shape"] == (3, 2) + assert mod.param_info["params_individual_cycle"]["shape"] == (3, 2) params = { "length_individual_cycle": 12.0,