@@ -43,6 +43,11 @@ class CycleComponent(Component):
43
43
Names of the observed state variables. For univariate time series, defaults to ``["data"]``.
44
44
For multivariate time series, specify a list of names for each endogenous variable.
45
45
46
+ share_states: bool, default False
47
+ Whether latent states are shared across the observed states. If True, there will be only one set of latent
48
+ states, which are observed by all observed states. If False, each observed state has its own set of
49
+ latent states. This argument has no effect if `k_endog` is 1.
50
+
46
51
Notes
47
52
-----
48
53
The cycle component is very similar in implementation to the frequency domain seasonal component, expect that it
@@ -155,6 +160,7 @@ def __init__(
155
160
dampen : bool = False ,
156
161
innovations : bool = True ,
157
162
observed_state_names : list [str ] | None = None ,
163
+ share_states : bool = False ,
158
164
):
159
165
if observed_state_names is None :
160
166
observed_state_names = ["data" ]
@@ -167,6 +173,7 @@ def __init__(
167
173
cycle = int (cycle_length ) if cycle_length is not None else "Estimate"
168
174
name = f"Cycle[s={ cycle } , dampen={ dampen } , innovations={ innovations } ]"
169
175
176
+ self .share_states = share_states
170
177
self .estimate_cycle_length = estimate_cycle_length
171
178
self .cycle_length = cycle_length
172
179
self .innovations = innovations
@@ -175,8 +182,8 @@ def __init__(
175
182
176
183
k_endog = len (observed_state_names )
177
184
178
- k_states = 2 * k_endog
179
- k_posdef = 2 * k_endog
185
+ k_states = 2 if share_states else 2 * k_endog
186
+ k_posdef = 2 if share_states else 2 * k_endog
180
187
181
188
obs_state_idx = np .zeros (k_states )
182
189
obs_state_idx [slice (0 , k_states , 2 )] = 1
@@ -190,21 +197,26 @@ def __init__(
190
197
combine_hidden_states = True ,
191
198
obs_state_idxs = obs_state_idx ,
192
199
observed_state_names = observed_state_names ,
200
+ share_states = share_states ,
193
201
)
194
202
195
203
def make_symbolic_graph (self ) -> None :
204
+ k_endog = self .k_endog
205
+ k_endog_effective = 1 if self .share_states else k_endog
206
+
196
207
Z = np .array ([1.0 , 0.0 ]).reshape ((1 , - 1 ))
197
- design_matrix = block_diag (* [Z for _ in range (self . k_endog )])
208
+ design_matrix = block_diag (* [Z for _ in range (k_endog_effective )])
198
209
self .ssm ["design" , :, :] = pt .as_tensor_variable (design_matrix )
199
210
200
211
# selection matrix R defines structure of innovations (always identity for cycle components)
201
212
# when innovations=False, state cov Q=0, hence R @ Q @ R.T = 0
202
213
R = np .eye (2 ) # 2x2 identity for each cycle component
203
- selection_matrix = block_diag (* [R for _ in range (self . k_endog )])
214
+ selection_matrix = block_diag (* [R for _ in range (k_endog_effective )])
204
215
self .ssm ["selection" , :, :] = pt .as_tensor_variable (selection_matrix )
205
216
206
217
init_state = self .make_and_register_variable (
207
- f"params_{ self .name } " , shape = (self .k_endog , 2 ) if self .k_endog > 1 else (self .k_states ,)
218
+ f"params_{ self .name } " ,
219
+ shape = (k_endog_effective , 2 ) if k_endog_effective > 1 else (self .k_states ,),
208
220
)
209
221
self .ssm ["initial_state" , :] = init_state .ravel ()
210
222
@@ -219,37 +231,45 @@ def make_symbolic_graph(self) -> None:
219
231
rho = 1
220
232
221
233
T = rho * _frequency_transition_block (lamb , j = 1 )
222
- transition = block_diag (* [T for _ in range (self . k_endog )])
234
+ transition = block_diag (* [T for _ in range (k_endog_effective )])
223
235
self .ssm ["transition" ] = pt .specify_shape (transition , (self .k_states , self .k_states ))
224
236
225
237
if self .innovations :
226
- if self . k_endog == 1 :
238
+ if k_endog_effective == 1 :
227
239
sigma_cycle = self .make_and_register_variable (f"sigma_{ self .name } " , shape = ())
228
240
self .ssm ["state_cov" , :, :] = pt .eye (self .k_posdef ) * sigma_cycle ** 2
229
241
else :
230
242
sigma_cycle = self .make_and_register_variable (
231
- f"sigma_{ self .name } " , shape = (self . k_endog ,)
243
+ f"sigma_{ self .name } " , shape = (k_endog_effective ,)
232
244
)
233
245
state_cov = block_diag (
234
- * [pt .eye (2 ) * sigma_cycle [i ] ** 2 for i in range (self . k_endog )]
246
+ * [pt .eye (2 ) * sigma_cycle [i ] ** 2 for i in range (k_endog_effective )]
235
247
)
236
248
self .ssm ["state_cov" ] = pt .specify_shape (state_cov , (self .k_states , self .k_states ))
237
249
else :
238
250
# explicitly set state cov to 0 when no innovations
239
251
self .ssm ["state_cov" , :, :] = pt .zeros ((self .k_posdef , self .k_posdef ))
240
252
241
253
def populate_component_properties (self ):
242
- self .state_names = [
243
- f"{ f } _{ self .name } [{ var_name } ]" if self .k_endog > 1 else f"{ f } _{ self .name } "
244
- for var_name in self .observed_state_names
245
- for f in ["Cos" , "Sin" ]
246
- ]
254
+ k_endog = self .k_endog
255
+ k_endog_effective = 1 if self .share_states else k_endog
256
+
257
+ base_names = [f"{ f } _{ self .name } " for f in ["Cos" , "Sin" ]]
258
+
259
+ if self .share_states :
260
+ self .state_names = [f"{ name } [shared]" for name in base_names ]
261
+ else :
262
+ self .state_names = [
263
+ f"{ name } [{ var_name } ]" if k_endog_effective > 1 else name
264
+ for var_name in self .observed_state_names
265
+ for name in base_names
266
+ ]
247
267
248
268
self .param_names = [f"params_{ self .name } " ]
249
269
250
- if self . k_endog == 1 :
270
+ if k_endog_effective == 1 :
251
271
self .param_dims = {f"params_{ self .name } " : (f"state_{ self .name } " ,)}
252
- self .coords = {f"state_{ self .name } " : self . state_names }
272
+ self .coords = {f"state_{ self .name } " : base_names }
253
273
self .param_info = {
254
274
f"params_{ self .name } " : {
255
275
"shape" : (2 ,),
@@ -265,7 +285,7 @@ def populate_component_properties(self):
265
285
}
266
286
self .param_info = {
267
287
f"params_{ self .name } " : {
268
- "shape" : (self . k_endog , 2 ),
288
+ "shape" : (k_endog_effective , 2 ),
269
289
"constraints" : None ,
270
290
"dims" : (f"endog_{ self .name } " , f"state_{ self .name } " ),
271
291
}
@@ -274,22 +294,22 @@ def populate_component_properties(self):
274
294
if self .estimate_cycle_length :
275
295
self .param_names += [f"length_{ self .name } " ]
276
296
self .param_info [f"length_{ self .name } " ] = {
277
- "shape" : () if self . k_endog == 1 else (self . k_endog ,),
297
+ "shape" : () if k_endog_effective == 1 else (k_endog_effective ,),
278
298
"constraints" : "Positive, non-zero" ,
279
- "dims" : None if self . k_endog == 1 else (f"endog_{ self .name } " ,),
299
+ "dims" : None if k_endog_effective == 1 else (f"endog_{ self .name } " ,),
280
300
}
281
301
282
302
if self .dampen :
283
303
self .param_names += [f"dampening_factor_{ self .name } " ]
284
304
self .param_info [f"dampening_factor_{ self .name } " ] = {
285
- "shape" : () if self . k_endog == 1 else (self . k_endog ,),
305
+ "shape" : () if k_endog_effective == 1 else (k_endog_effective ,),
286
306
"constraints" : "0 < x ≤ 1" ,
287
- "dims" : None if self . k_endog == 1 else (f"endog_{ self .name } " ,),
307
+ "dims" : None if k_endog_effective == 1 else (f"endog_{ self .name } " ,),
288
308
}
289
309
290
310
if self .innovations :
291
311
self .param_names += [f"sigma_{ self .name } " ]
292
- if self . k_endog == 1 :
312
+ if k_endog_effective == 1 :
293
313
self .param_info [f"sigma_{ self .name } " ] = {
294
314
"shape" : (),
295
315
"constraints" : "Positive" ,
@@ -298,7 +318,7 @@ def populate_component_properties(self):
298
318
else :
299
319
self .param_dims [f"sigma_{ self .name } " ] = (f"endog_{ self .name } " ,)
300
320
self .param_info [f"sigma_{ self .name } " ] = {
301
- "shape" : (self . k_endog ,),
321
+ "shape" : (k_endog_effective ,),
302
322
"constraints" : "Positive" ,
303
323
"dims" : (f"endog_{ self .name } " ,),
304
324
}
0 commit comments