Skip to content

Commit 668593b

Browse files
authored
Feature: Implement series-specific model selection using meta-features (#1211)
2 parents 38992ff + 237551c commit 668593b

File tree

12 files changed

+928
-131
lines changed

12 files changed

+928
-131
lines changed

ads/opctl/operator/lowcode/common/data.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,21 @@
1919

2020

2121
class AbstractData(ABC):
22-
def __init__(self, spec, name="input_data", data=None):
22+
def __init__(self, spec, name="input_data", data=None, subset=None):
2323
self.Transformations = Transformations
2424
self.data = None
2525
self._data_dict = dict()
2626
self.name = name
2727
self.spec = spec
28+
self.subset = subset
2829
if data is not None:
2930
self.data = data
3031
else:
3132
self.load_transform_ingest_data(spec)
33+
# Subset by series if requested
34+
# if self.subset is not None and hasattr(self, 'data') and self.data is not None:
35+
# subset_str = [str(s) for s in self.subset]
36+
# self.data = self.data[self.data.index.get_level_values(DataColumns.Series).isin(subset_str)]
3237

3338
def get_raw_data_by_cat(self, category):
3439
mapping = self._data_transformer.get_target_category_columns_map()
@@ -72,7 +77,7 @@ def get_data_for_series(self, series_id):
7277
def _load_data(self, data_spec, **kwargs):
7378
loading_start_time = time.time()
7479
try:
75-
raw_data = load_data(data_spec)
80+
raw_data = load_data(data_spec, subset=self.subset if self.subset else None, target_category_columns=self.spec.target_category_columns)
7681
except InvalidParameterError as e:
7782
e.args = e.args + (f"Invalid Parameter: {self.name}",)
7883
raise e

ads/opctl/operator/lowcode/common/transformations.py

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,3 +294,210 @@ def get_target_category_columns_map(self):
294294
def _fill_na(self, df: pd.DataFrame, na_value=0) -> pd.DataFrame:
295295
"""Fill nans in dataframe"""
296296
return df.fillna(value=na_value)
297+
298+
def build_fforms_meta_features(self, data, target_col=None, group_cols=None):
299+
"""
300+
Build meta-features for time series based on FFORMS paper and add them to the original DataFrame.
301+
302+
Parameters
303+
----------
304+
data : pandas.DataFrame
305+
Input DataFrame containing time series data
306+
target_col : str, optional
307+
Name of the target column to calculate meta-features for.
308+
If None, uses the target column specified in dataset_info.
309+
group_cols : list of str, optional
310+
List of columns to group by before calculating meta-features.
311+
If None, calculates features for the entire series.
312+
313+
Returns
314+
-------
315+
pandas.DataFrame
316+
Original DataFrame with additional meta-feature columns
317+
318+
References
319+
----------
320+
Talagala, T. S., Hyndman, R. J., & Athanasopoulos, G. (2023).
321+
Meta-learning how to forecast time series. Journal of Forecasting, 42(6), 1476-1501.
322+
"""
323+
if not isinstance(data, pd.DataFrame):
324+
raise ValueError("Input must be a pandas DataFrame")
325+
326+
# Use target column from dataset_info if not specified
327+
if target_col is None:
328+
target_col = self.target_column_name
329+
if target_col not in data.columns:
330+
raise ValueError(f"Target column '{target_col}' not found in DataFrame")
331+
332+
# Check if group_cols are provided and valid
333+
if group_cols is not None:
334+
if not isinstance(group_cols, list):
335+
raise ValueError("group_cols must be a list of column names")
336+
for col in group_cols:
337+
if col not in data.columns:
338+
raise ValueError(f"Group column '{col}' not found in DataFrame")
339+
340+
# If no group_cols, get the target_category_columns else treat the entire DataFrame as a single series
341+
if not group_cols:
342+
group_cols = self.target_category_columns if self.target_category_columns else []
343+
344+
# Calculate meta-features for each series
345+
def calculate_series_features(series):
346+
"""Calculate features for a single series"""
347+
n = len(series)
348+
values = series.values
349+
350+
# Basic statistics
351+
mean = series.mean()
352+
std = series.std()
353+
variance = series.var()
354+
skewness = series.skew()
355+
kurtosis = series.kurtosis()
356+
cv = std / mean if mean != 0 else np.inf
357+
358+
# Trend features
359+
X = np.vstack([np.arange(n), np.ones(n)]).T
360+
trend_coef = np.linalg.lstsq(X, values, rcond=None)[0][0]
361+
trend_pred = X.dot(np.linalg.lstsq(X, values, rcond=None)[0])
362+
residuals = values - trend_pred
363+
std_residuals = np.std(residuals)
364+
365+
# Turning points
366+
turning_points = 0
367+
for i in range(1, n-1):
368+
if (values[i-1] < values[i] and values[i] > values[i+1]) or \
369+
(values[i-1] > values[i] and values[i] < values[i+1]):
370+
turning_points += 1
371+
turning_points_rate = turning_points / (n-2) if n > 2 else 0
372+
373+
# Serial correlation
374+
acf1 = series.autocorr(lag=1) if n > 1 else 0
375+
acf2 = series.autocorr(lag=2) if n > 2 else 0
376+
acf10 = series.autocorr(lag=10) if n > 10 else 0
377+
378+
# Seasonality features
379+
seasonal_strength = 0
380+
seasonal_peak_strength = 0
381+
if n >= 12:
382+
seasonal_lags = [12, 24, 36]
383+
seasonal_acfs = []
384+
for lag in seasonal_lags:
385+
if n > lag:
386+
acf_val = series.autocorr(lag=lag)
387+
seasonal_acfs.append(abs(acf_val))
388+
seasonal_peak_strength = max(seasonal_acfs) if seasonal_acfs else 0
389+
390+
ma = series.rolling(window=12, center=True).mean()
391+
seasonal_comp = series - ma
392+
seasonal_strength = 1 - np.var(seasonal_comp.dropna()) / np.var(series)
393+
394+
# Stability and volatility features
395+
values_above_mean = values >= mean
396+
crossing_points = np.sum(values_above_mean[1:] != values_above_mean[:-1])
397+
crossing_rate = crossing_points / (n - 1) if n > 1 else 0
398+
399+
# First and second differences
400+
diff1 = np.diff(values)
401+
diff2 = np.diff(diff1) if len(diff1) > 1 else np.array([])
402+
403+
diff1_mean = np.mean(np.abs(diff1)) if len(diff1) > 0 else 0
404+
diff1_var = np.var(diff1) if len(diff1) > 0 else 0
405+
diff2_mean = np.mean(np.abs(diff2)) if len(diff2) > 0 else 0
406+
diff2_var = np.var(diff2) if len(diff2) > 0 else 0
407+
408+
# Nonlinearity features
409+
if n > 3:
410+
X = values[:-1].reshape(-1, 1)
411+
y = values[1:]
412+
X2 = X * X
413+
X3 = X * X * X
414+
X_aug = np.hstack([X, X2, X3])
415+
nonlinearity = np.linalg.lstsq(X_aug, y, rcond=None)[1][0] if len(y) > 0 else 0
416+
else:
417+
nonlinearity = 0
418+
419+
# Long-term trend features
420+
if n >= 10:
421+
mid = n // 2
422+
trend_change = np.mean(values[mid:]) - np.mean(values[:mid])
423+
else:
424+
trend_change = 0
425+
426+
# Step changes and spikes
427+
step_changes = np.abs(diff1).max() if len(diff1) > 0 else 0
428+
spikes = np.sum(np.abs(values - mean) > 2 * std) / n if std != 0 else 0
429+
430+
# Hurst exponent and entropy
431+
lag = min(10, n // 2)
432+
variance_ratio = np.var(series.diff(lag)) / (lag * np.var(series.diff())) if n > lag else 0
433+
hurst = np.log(variance_ratio) / (2 * np.log(lag)) if variance_ratio > 0 and lag > 1 else 0
434+
435+
hist, _ = np.histogram(series, bins='auto', density=True)
436+
entropy = -np.sum(hist[hist > 0] * np.log(hist[hist > 0]))
437+
438+
return pd.Series({
439+
'ts_n_obs': n,
440+
'ts_mean': mean,
441+
'ts_std': std,
442+
'ts_variance': variance,
443+
'ts_cv': cv,
444+
'ts_skewness': skewness,
445+
'ts_kurtosis': kurtosis,
446+
'ts_trend': trend_coef,
447+
'ts_trend_change': trend_change,
448+
'ts_std_residuals': std_residuals,
449+
'ts_turning_points_rate': turning_points_rate,
450+
'ts_seasonal_strength': seasonal_strength,
451+
'ts_seasonal_peak_strength': seasonal_peak_strength,
452+
'ts_acf1': acf1,
453+
'ts_acf2': acf2,
454+
'ts_acf10': acf10,
455+
'ts_crossing_rate': crossing_rate,
456+
'ts_diff1_mean': diff1_mean,
457+
'ts_diff1_variance': diff1_var,
458+
'ts_diff2_mean': diff2_mean,
459+
'ts_diff2_variance': diff2_var,
460+
'ts_nonlinearity': nonlinearity,
461+
'ts_step_max': step_changes,
462+
'ts_spikes_rate': spikes,
463+
'ts_hurst': hurst,
464+
'ts_entropy': entropy
465+
})
466+
467+
# Create copy of input DataFrame
468+
result_df = data.copy()
469+
470+
if group_cols:
471+
# Calculate features for each group
472+
features = []
473+
# Sort by date within each group if date column exists
474+
date_col = self.dt_column_name if self.dt_column_name else 'Date'
475+
if date_col in data.columns:
476+
data = data.sort_values([date_col] + group_cols)
477+
478+
for name, group in data.groupby(group_cols):
479+
# Sort group by date if exists
480+
if date_col in group.columns:
481+
group = group.sort_values(date_col)
482+
group_features = calculate_series_features(group[target_col])
483+
if isinstance(name, tuple):
484+
feature_row = dict(zip(group_cols, name))
485+
else:
486+
feature_row = {group_cols[0]: name}
487+
feature_row.update(group_features)
488+
features.append(feature_row)
489+
490+
# Create features DataFrame without merging
491+
features_df = pd.DataFrame(features)
492+
# Return only the meta-features DataFrame with group columns
493+
return features_df
494+
else:
495+
# Sort by date if exists and calculate features for entire series
496+
date_col = self.dt_column_name if self.dt_column_name else 'Date'
497+
if date_col in data.columns:
498+
data = data.sort_values(date_col)
499+
features = calculate_series_features(data[target_col])
500+
# Return single row DataFrame with meta-features
501+
return pd.DataFrame([features])
502+
503+
return result_df

ads/opctl/operator/lowcode/common/utils.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ def load_data(data_spec, storage_options=None, **kwargs):
124124
data = data[columns]
125125
if limit:
126126
data = data[:limit]
127+
# Filtering by subset if provided
128+
subset = kwargs.get('subset', None)
129+
if subset is not None:
130+
target_category_columns = kwargs.get('target_category_columns', None)
131+
mask = False
132+
for col in target_category_columns:
133+
mask = mask | data[col].isin(subset)
134+
data = data[mask]
127135
return data
128136

129137

ads/opctl/operator/lowcode/forecast/__main__.py

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,20 @@
33
# Copyright (c) 2023, 2025 Oracle and/or its affiliates.
44
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/
55

6+
import copy
67
import json
78
import os
89
import sys
910
from typing import Dict, List
1011

12+
import pandas as pd
1113
import yaml
1214

1315
from ads.opctl import logger
1416
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
1517
from ads.opctl.operator.common.utils import _parse_input_args
1618

19+
from .const import AUTO_SELECT_SERIES
1720
from .model.forecast_datasets import ForecastDatasets, ForecastResults
1821
from .operator_config import ForecastOperatorConfig
1922
from .whatifserve import ModelDeploymentManager
@@ -24,9 +27,56 @@ def operate(operator_config: ForecastOperatorConfig) -> ForecastResults:
2427
from .model.factory import ForecastOperatorModelFactory
2528

2629
datasets = ForecastDatasets(operator_config)
27-
results = ForecastOperatorModelFactory.get_model(
28-
operator_config, datasets
29-
).generate_report()
30+
model = ForecastOperatorModelFactory.get_model(operator_config, datasets)
31+
32+
if operator_config.spec.model == AUTO_SELECT_SERIES and hasattr(
33+
operator_config.spec, "meta_features"
34+
):
35+
# For AUTO_SELECT_SERIES, handle each series with its specific model
36+
meta_features = operator_config.spec.meta_features
37+
results = ForecastResults()
38+
sub_results_list = []
39+
40+
# Group the data by selected model
41+
for model_name in meta_features["selected_model"].unique():
42+
# Get series that use this model
43+
series_groups = meta_features[meta_features["selected_model"] == model_name]
44+
45+
# Create a sub-config for this model
46+
sub_config = copy.deepcopy(operator_config)
47+
sub_config.spec.model = model_name
48+
49+
# Create sub-datasets for these series
50+
sub_datasets = ForecastDatasets(
51+
operator_config,
52+
subset=series_groups[operator_config.spec.target_category_columns]
53+
.values.flatten()
54+
.tolist(),
55+
)
56+
57+
# Get and run the appropriate model
58+
sub_model = ForecastOperatorModelFactory.get_model(sub_config, sub_datasets)
59+
sub_result_df, sub_elapsed_time = sub_model.build_model()
60+
sub_results = sub_model.generate_report(
61+
result_df=sub_result_df,
62+
elapsed_time=sub_elapsed_time,
63+
save_sub_reports=True,
64+
)
65+
sub_results_list.append(sub_results)
66+
67+
# results_df = pd.concat([results_df, sub_result_df], ignore_index=True, axis=0)
68+
# elapsed_time += sub_elapsed_time
69+
# Merge all sub_results into a single ForecastResults object
70+
if sub_results_list:
71+
results = sub_results_list[0]
72+
for sub_result in sub_results_list[1:]:
73+
results.merge(sub_result)
74+
else:
75+
results = None
76+
77+
else:
78+
# For other cases, use the single selected model
79+
results = model.generate_report()
3080
# saving to model catalog
3181
spec = operator_config.spec
3282
if spec.what_if_analysis and datasets.additional_data:

ads/opctl/operator/lowcode/forecast/const.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,5 @@ class ForecastOutputColumns(ExtendedEnum):
8989
PROPHET_INTERNAL_DATE_COL = "ds"
9090
RENDER_LIMIT = 5000
9191
AUTO_SELECT = "auto-select"
92+
AUTO_SELECT_SERIES = "auto-select-series"
9293
BACKTEST_REPORT_NAME = "back_test.csv"

0 commit comments

Comments
 (0)