diff --git a/.github/agents/api-backward-compatibility-specialist.md b/.github/agents/api-backward-compatibility-specialist.md index 9486623c8e..3c677f0c21 100644 --- a/.github/agents/api-backward-compatibility-specialist.md +++ b/.github/agents/api-backward-compatibility-specialist.md @@ -494,3 +494,11 @@ After each assignment: Change: - Added guidance on ``` + +### Lessons Learned + +**Session 2025 (PR #2176 — annotation regressors forecasting config)**: + +- **New pipeline config keys are additive (backward compatible)**: `future-annotation-regressors` uses `load_default=[]`, so existing configs without this key continue to work. This is the correct pattern for new optional pipeline features. When reviewing new schema fields, verify `load_default` (not `required=True`) is set for backward compatibility. +- **Forecaster `_clean_parameters` is not a regression risk for new config keys**: New pipeline config keys left in `_clean_parameters` are preserved in DataSource attributes. New keys added to the removal list are dropped. Neither path is a breaking change for existing clients; both are intentional design choices. The API Specialist should verify only that the key naming (kebab-case via `data_key`) is consistent with existing keys — it is. +- **`data_key` kebab-case is the API surface**: The external-facing key is the `data_key` value (`future-annotation-regressors`), not the Python attribute name (`future_annotation_regressors`). Clients sending JSON configs use the kebab-case form. Never check only the Python attribute name when reviewing API contracts for forecasting/reporting config fields. diff --git a/.github/agents/architecture-domain-specialist.md b/.github/agents/architecture-domain-specialist.md index 33b4570296..98f1b47864 100644 --- a/.github/agents/architecture-domain-specialist.md +++ b/.github/agents/architecture-domain-specialist.md @@ -604,3 +604,10 @@ After each assignment: - **Schema parity gap**: The PR added `account_id` to `BeliefsSearchConfigSchema` but not to `Input` (io.py). These two schemas both expose `Sensor.search_beliefs` parameters; omitting a parameter from one creates a silent gap. The architecture agent must check both schemas on any search_beliefs parameter addition. - **Documentation vs. implementation mismatch**: The `reporting.rst` docs stated reporters can filter by `account_id`, but this only works if `Input` also has the field. Docs that outrun schema support mislead users. Always verify the full schema chain before documenting a feature. - **DataSource account_id=None for non-user sources**: The existing invariant (reporters/schedulers/forecasters have `account_id=None`) limits the usefulness of `account_id` filtering: it only matches user-type sources. PRs adding `account_id` filters should either document this limitation explicitly or reconsider the invariant. + +**Session 2025 (PR #2176 — annotation regressors for forecasting pipeline)**: + +- **Duck-type proxy pattern for non-sensor pipeline inputs**: `_AnnotationRegressorProxy` in `pipelines/base.py` provides `event_resolution` and `name` attributes so annotation data can flow through `detect_and_fill_missing_values` without being a real `Sensor`. When reviewing similar PRs, check that the proxy only exposes the attributes actually accessed (not `.id` as integer, `.unit`, etc.). The pattern is valid as long as the method being reused doesn't access attributes the proxy lacks. +- **Annotation query parity**: `query_account_annotations` was added to mirror `query_asset_annotations`. When a new query function is added to `annotations.py`, verify both account-level and asset-level variants are kept consistent (same parameter list, same filter order). +- **Pipeline config schema isolation**: `future-annotation-regressors` is correctly placed only in `TrainPredictPipelineConfigSchema` (a pipeline config key), not in reporter `Input` or `BeliefsSearchConfigSchema` (search params). Distinguish between: pipeline config keys (train/predict behavior), search parameters (timely_beliefs query filters), and reporter input specs. These are separate concerns with separate schemas. +- **`_clean_parameters` does not remove annotation regressors**: The `future-annotation-regressors` key is intentionally preserved in DataSource attributes because it's needed for deterministic model retraining. When a new pipeline config key is added, decide explicitly: should it be removed from persisted attributes (add to `_clean_parameters` removal list) or preserved (leave it out)? diff --git a/.github/agents/coordinator.md b/.github/agents/coordinator.md index bc5a593a03..97c0f9d38e 100644 --- a/.github/agents/coordinator.md +++ b/.github/agents/coordinator.md @@ -661,3 +661,26 @@ If any agent hasn't self-improved, Lead must: - Test Specialist must cover all model classes that receive the new parameter **Additional gap**: `account_id` for non-user DataSources (reporters, schedulers, forecasters) remains `None`. The filter therefore only matches user-type sources. This architectural constraint (documented in Architecture Specialist) limits the feature's utility and should be prominently noted in documentation whenever `account_id` filtering is described. + +### Session 2025: Annotation Regressors Feature (PR #2176) + +**Context**: Feature adding `future-annotation-regressors` pipeline config, `holidays-by-package` CLI command, and `--timezone` to `add_holidays`. Reviewed post-session by Coordinator. + +**Delegation observation**: All 7 commits authored by `copilot-swe-agent[bot]` — single-agent execution. No specialist agents engaged. This is the recurring delegation failure pattern. + +**Code quality observation**: Despite single-agent execution, the implementation quality is high: +- `query_account_annotations` correctly mirrors `query_asset_annotations` +- `_AnnotationRegressorProxy` duck-type proxy is minimal and correct +- Schema `data_key` kebab-case is consistent with existing fields +- `_config.get("future_annotation_regressors")` correctly accesses Marshmallow-deserialized snake_case key +- `holidays>=0.57` dependency has no known CVEs +- Tests are meaningful with appropriately conservative assertion bounds + +**New patterns documented** (agent instructions updated): +- Architecture Specialist: duck-type proxy pattern, annotation query parity, pipeline config key isolation, `_clean_parameters` retention decision +- Data & Time Specialist: `--timezone` recommendation, UTC-naive convention for annotation-to-pipeline loading, DST boundary risk for full-day annotations +- API Specialist: `load_default=[]` backward compatibility pattern, `data_key` as API surface (not Python attribute name) + +**Governance gap to monitor**: The `future-annotation-regressors` key is preserved in DataSource attributes (not cleaned out). This means repeated calls to `flexmeasures add forecasts` with the same config will produce DataSources with this key in their attributes. This is intentional for retraining but creates storage overhead at scale. No action needed now, but flag if this becomes a performance concern (see Performance Specialist). + +**One open question**: If both `account_id` and `asset_id` are omitted from an annotation regressor spec, the pipeline logs a warning and returns empty data — the forecast proceeds without that regressor. This is silent degradation. A future improvement could make this a validation error at schema load time. Not blocking for this PR. diff --git a/.github/agents/data-time-semantics-specialist.md b/.github/agents/data-time-semantics-specialist.md index fb466d0cb2..988922f536 100644 --- a/.github/agents/data-time-semantics-specialist.md +++ b/.github/agents/data-time-semantics-specialist.md @@ -194,3 +194,11 @@ After each assignment: Change: - Added guidance on ``` + +### Lessons Learned + +**Session 2025 (PR #2176 — annotation regressors, timezone-aware holiday import)**: + +- **Holiday CLI commands need `--timezone`**: Without `--timezone`, `flexmeasures add holidays` and `flexmeasures add holidays-by-package` store annotations at UTC midnight. In CET (UTC+1), this means a holiday appearing at 01:00 local time in charts — one hour off. Always recommend `--timezone Europe/Amsterdam` (or equivalent) when documenting holiday import. The warning message in the CLI also surfaces this, but documentation must reinforce it. +- **Annotation-to-pipeline UTC-naive convention**: When loading annotation timestamps into the forecasting pipeline, convert tz-aware datetimes to UTC-naive using `.tz_convert("UTC").tz_localize(None)`. This matches the convention established in `load_data_all_beliefs`. Failure to do this causes merge/alignment failures when joining annotation data with sensor data that uses the same convention. +- **DST boundary risk for full-day annotations**: Holiday annotations span exactly one calendar day (e.g., `start=2024-03-31T00:00 CET`, `end=2024-04-01T00:00 CEST`). If stored tz-aware, the `end - start` interval is 23 hours across the spring-forward transition. If stored at UTC midnight (no timezone), `end - start` is always exactly 24 hours. Validate this is handled consistently when querying multi-year ranges spanning DST transitions. diff --git a/documentation/changelog.rst b/documentation/changelog.rst index 14a4139ecf..239c33ad5a 100644 --- a/documentation/changelog.rst +++ b/documentation/changelog.rst @@ -18,6 +18,9 @@ New features * New ``GET /api/v3_0/sources`` endpoint to list accessible data sources and defined types, with ``only_latest=true`` by default to return only the most recent version per source [see `PR #2126 `_] * Add support for filtering sensor data GET requests by ``source-type`` on ``/api/v3_0/sensors//data`` [see `PR #2127 `_] * Making monitoring alerts more flexible: allow ``flexmeasures monitor`` alerts to target one or more user IDs or email addresses with ``--recipient``; ``flexmeasures monitor last-seen`` can now narrow monitored users to one or more accounts with ``--account`` or to client accounts with ``--consultancy`` [see `PR #2158 `_] +* Add ``--timezone`` option to ``flexmeasures add holidays`` to store holiday annotations at local midnight; defaults to the ``FLEXMEASURES_TIMEZONE`` config setting (note: ``--year`` is now required) [see `PR #2178 `_] +* Merge ``flexmeasures add holidays-by-package`` into ``flexmeasures add holidays``; use ``--subdiv`` or ``--category`` to automatically switch to the ``holidays`` Python package, or ``--calendar-class``/``--calendar-kwargs`` for specific workalendar classes such as ``NetherlandsWithSchoolHolidays`` [see `PR #2178 `_] +* Add ``annotation-regressors`` field to the forecasting pipeline config schema (renamed from ``future-annotation-regressors``), with structured ``account``/``asset``/``sensor`` and ``annotation-type`` keys, and support for sensor annotations [see `PR #2178 `_] Infrastructure / Support ---------------------- diff --git a/documentation/concepts/annotations.rst b/documentation/concepts/annotations.rst index 32bb4a07b3..92de53a00b 100644 --- a/documentation/concepts/annotations.rst +++ b/documentation/concepts/annotations.rst @@ -34,7 +34,9 @@ Annotations are particularly useful for: **Forecasting and Scheduling** Holiday annotations help forecasting algorithms understand when energy consumption patterns deviate from normal patterns. - FlexMeasures can automatically import public holidays using the ``flexmeasures add holidays`` command. + FlexMeasures can automatically import public holidays using ``flexmeasures add holidays`` (workalendar, + default) or the ``holidays`` package (supports school holidays for selected countries). + These can then be used directly as annotation regressors in the forecasting pipeline — see :ref:`forecasting`. **Data Quality Tracking** Mark periods with known sensor issues, data gaps, or quality problems using ``error`` or ``warning`` type annotations. @@ -386,19 +388,67 @@ You can target accounts, assets, or sensors: flexmeasures add annotation --account-id 1 --content "..." --start "..." --end "..." -**Holiday import command:** +**Holiday import:** -FlexMeasures can automatically import public holidays using the `workalendar `_ library: +The ``flexmeasures add holidays`` command supports both the `workalendar `_ library (default) and the `holidays `_ package (for school holidays and additional subdivisions). Always pass ``--timezone`` matching the country's timezone so annotations are stored at local midnight rather than UTC midnight. -.. code-block:: bash +.. tip:: - # Add holidays for a specific account - flexmeasures add holidays --account-id 1 --year 2025 --country NL - - # Add holidays for an asset - flexmeasures add holidays --asset-id 5 --year 2025 --country DE + Omitting ``--timezone`` causes annotations to be stored at UTC midnight, which may make + holidays appear at the wrong local hour in charts (e.g. 1 AM or 2 AM in CET/CEST). + +.. tabs:: + + .. tab:: workalendar (public holidays) + + Uses the `workalendar `_ library. The default + when no ``--subdiv`` or ``--category`` is specified. + + .. code-block:: bash + + # Add NL public holidays for 2025, stored at Amsterdam midnight + flexmeasures add holidays --year 2025 --country NL --account 1 --timezone Europe/Amsterdam + + # Add German public holidays (federal level) + flexmeasures add holidays --year 2025 --country DE --asset 5 --timezone Europe/Berlin + + .. tab:: workalendar (specific calendar class) + + Use ``--calendar-class`` to access a specific workalendar class not available via a plain + country code, such as regional school holiday calendars. + + .. code-block:: bash + + # Netherlands school holidays for the "north" region in 2024 + flexmeasures add holidays --year 2024 \ + --calendar-class workalendar.europe.netherlands.NetherlandsWithSchoolHolidays \ + --calendar-kwargs '{"region": "north"}' \ + --account 1 --timezone Europe/Amsterdam + + .. tab:: holidays package (school holidays) + + Use ``--subdiv`` or ``--category school`` to automatically switch to the + `holidays `_ package, which supports school + holidays for selected countries. + + .. code-block:: bash + + # Bavaria school holidays for 2024 + flexmeasures add holidays --year 2024 --country DE --subdiv BY --category school \ + --account 1 --timezone Europe/Berlin + + # Dutch public holidays via the holidays package + flexmeasures add holidays --year 2025 --country NL --package holidays \ + --account 1 --timezone Europe/Amsterdam + + Key options when using the holidays package: + + - ``--country``: ISO 3166-1 alpha-2 code (e.g. ``DE``, ``NL``, ``AT``). + - ``--subdiv``: State/province code (e.g. ``BY`` for Bavaria). + - ``--category``: ``public`` (default), ``school``, ``optional``. Check + `python-holidays docs `_ for per-country options. -See ``flexmeasures add holidays --help`` for available countries and options. +See ``flexmeasures add holidays --help`` for all options. Viewing annotations diff --git a/documentation/features/forecasting.rst b/documentation/features/forecasting.rst index 8839bf34c0..3be650ed65 100644 --- a/documentation/features/forecasting.rst +++ b/documentation/features/forecasting.rst @@ -143,3 +143,43 @@ If you want to take regressors into account, in addition to merely past measurem Including regressors can significantly improve forecasting accuracy, especially when they are highly correlated with the target variable. For example, using irradiation forecasts as regressors can substantially improve solar production predictions. In `this weather forecast plugin `_, we enable you to collect regressor data for ``["temperature", "wind speed", "cloud cover", "irradiance"]``, at a location you select. +Annotation regressors +~~~~~~~~~~~~~~~~~~~~~ + +In addition to sensor-based regressors, you can use *annotation regressors* to let the forecasting model learn from binary signals derived from annotation data. Holiday flags, factory shutdowns, or any other event stored as an annotation can be passed as future covariates. + +Annotation regressors are configured in the ``annotation-regressors`` key of the forecasting config. Each entry is a dict with: + +- ``account``, ``asset``, or ``sensor`` (required): the database ID of the account, asset, or sensor whose annotations to use. +- ``annotation-type`` (optional, default ``"holiday"``): filter to annotations of this type (``"holiday"``, ``"label"``, ``"alert"``, etc.). +- ``name`` (optional): a human-readable column name for the regressor. Defaults to ``annotation_regressor_``. + +The annotation data is converted to a binary 0/1 time series at the target sensor's resolution: **1** for every time step that falls within an annotation period, **0** otherwise. Since holidays and scheduled events are typically known in advance, annotation regressors are treated as *future* covariates. + +Example config (passed via ``--config`` file): + +.. code-block:: json + + { + "annotation-regressors": [ + {"account": 1, "annotation-type": "holiday", "name": "public_holidays"}, + {"asset": 5, "annotation-type": "label", "name": "factory_shutdown"} + ] + } + +Usage: + +.. code-block:: bash + + flexmeasures add forecasts \ + --from-date 2024-01-01 --to-date 2024-12-31 \ + --max-forecast-horizon 24 \ + --sensor 42 \ + --config '{"annotation-regressors": [{"account": 1, "annotation-type": "holiday"}]}' + +.. note:: + + Holiday annotations must be added to the account or asset before running the forecast. + Use ``flexmeasures add holidays`` to populate them (supports both workalendar and the ``holidays`` + package). See :ref:`annotations` for details. + diff --git a/flexmeasures/cli/data_add.py b/flexmeasures/cli/data_add.py index 112b21dd4c..e24fe67365 100755 --- a/flexmeasures/cli/data_add.py +++ b/flexmeasures/cli/data_add.py @@ -29,6 +29,7 @@ from timely_beliefs.sensors.func_store.knowledge_horizons import x_days_ago_at_y_oclock import timely_beliefs as tb from workalendar.registry import registry as workalendar_registry +import holidays as hpkg from flexmeasures import Forecaster, Reporter from flexmeasures.cli.utils import ( @@ -917,11 +918,138 @@ def add_annotation( click.secho("Successfully added annotation.", **MsgStyle.SUCCESS) +def _make_holiday_annotation(content, date, timezone, source): + """Create (or look up) a single holiday Annotation at local midnight.""" + start = pd.Timestamp(date).tz_localize(timezone) + end = start + pd.offsets.DateOffset(days=1) + annotation, _ = get_or_create_annotation( + Annotation(content=content, start=start, end=end, source=source, type="holiday") + ) + return annotation + + +def _holidays_from_workalendar_class(year, calendar_class, calendar_kwargs, timezone): + """Load holidays via a specific workalendar class path. + + :returns: (annotations, num_holidays_dict) + """ + import importlib + + try: + module_path, class_name = calendar_class.rsplit(".", 1) + module = importlib.import_module(module_path) + cls = getattr(module, class_name) + except (ValueError, ImportError, AttributeError) as e: + click.secho( + f"Could not import calendar class '{calendar_class}': {e}", + **MsgStyle.ERROR, + ) + raise click.Abort() + + kwargs = {} + if calendar_kwargs: + try: + kwargs = json.loads(calendar_kwargs) + except json.JSONDecodeError as e: + click.secho( + f"Invalid JSON for --calendar-kwargs: {e}", + **MsgStyle.ERROR, + ) + raise click.Abort() + + source = get_or_create_source( + "workalendar", model=class_name, source_type="CLI script" + ) + calendar = cls(**kwargs) + holidays_list = calendar.holidays(year) + annotations = [ + _make_holiday_annotation(h[1], h[0], timezone, source) for h in holidays_list + ] + return annotations, {class_name: len(holidays_list)} + + +def _holidays_from_package(year, countries, subdiv, category, timezone): + """Load holidays via the holidays Python package. + + :returns: (annotations, num_holidays_dict, effective_category, model_str) + """ + if len(countries) != 1: + click.secho( + "Exactly one --country is required when using the holidays package.", + **MsgStyle.ERROR, + ) + raise click.Abort() + country = countries[0] + + supported = hpkg.list_supported_countries() + if country not in supported: + click.secho( + f"Country '{country}' is not supported by the holidays package.", + **MsgStyle.ERROR, + ) + click.secho( + f"Supported countries include: {', '.join(list(supported.keys())[:20])} ...", + **MsgStyle.WARN, + ) + raise click.Abort() + + effective_category = category or "public" + try: + h = hpkg.country_holidays( + country, + subdiv=subdiv, + years=year, + categories=(effective_category,), + ) + except (ValueError, NotImplementedError) as e: + click.secho( + f"Error creating holidays for {country}" + + (f"/{subdiv}" if subdiv else "") + + f" (category='{effective_category}'): {e}", + **MsgStyle.ERROR, + ) + click.secho( + "Check valid subdivisions and categories at https://python-holidays.readthedocs.io/", + **MsgStyle.WARN, + ) + raise click.Abort() + + model_str = f"{country}/{subdiv}" if subdiv else country + source = get_or_create_source("holidays", model=model_str, source_type="CLI script") + annotations = [ + _make_holiday_annotation(h[date], date, timezone, source) + for date in sorted(h.keys()) + ] + return annotations, {model_str: len(h)}, effective_category, model_str + + +def _holidays_from_workalendar_registry(year, countries, timezone): + """Load holidays via workalendar's ISO registry. + + :returns: (annotations, num_holidays_dict) + """ + calendars = workalendar_registry.get_calendars(countries) + annotations = [] + num_holidays = {} + for country, calendar in calendars.items(): + source = get_or_create_source( + "workalendar", model=country, source_type="CLI script" + ) + holidays_list = calendar().holidays(year) + for holiday in holidays_list: + annotations.append( + _make_holiday_annotation(holiday[1], holiday[0], timezone, source) + ) + num_holidays[country] = len(holidays_list) + return annotations, num_holidays + + @fm_add_data.command("holidays", cls=DeprecatedOptionsCommand) @with_appcontext @click.option( "--year", type=click.INT, + required=True, help="The year for which to look up holidays", ) @click.option( @@ -929,7 +1057,49 @@ def add_annotation( "countries", type=click.STRING, multiple=True, - help="The ISO 3166-1 country/region or ISO 3166-2 sub-region for which to look up holidays (such as US, BR and DE). This argument can be given multiple times.", + help="The ISO 3166-1 country/region or ISO 3166-2 sub-region for which to look up holidays (such as US, BR and DE). This argument can be given multiple times. When using the holidays package, exactly one country is required.", +) +@click.option( + "--calendar-class", + "calendar_class", + type=click.STRING, + default=None, + help="Full dotted path to a workalendar calendar class " + "(e.g. 'workalendar.europe.netherlands.NetherlandsWithSchoolHolidays'). " + "Use this for calendars not accessible via a simple country code.", +) +@click.option( + "--calendar-kwargs", + "calendar_kwargs", + type=click.STRING, + default=None, + help="JSON string of keyword arguments to pass to the workalendar calendar constructor " + '(e.g. \'{"region": "north"}\').', +) +@click.option( + "--subdiv", + "subdiv", + type=click.STRING, + default=None, + help="Subdivision (state/province) code for the holidays package (e.g. 'BY' for Bavaria). " + "If provided, the holidays package is used automatically.", +) +@click.option( + "--category", + "category", + type=click.STRING, + default=None, + help="Holiday category for the holidays package (e.g. 'school', 'optional'). " + "Defaults to 'public' when the holidays package is used. " + "If provided, the holidays package is used automatically.", +) +@click.option( + "--package", + "package", + type=click.Choice(["workalendar", "holidays"]), + default=None, + help="Which package to use: 'workalendar' (default) or 'holidays'. " + "Auto-detected from --subdiv/--category when not specified.", ) @click.option( "--asset", @@ -953,15 +1123,67 @@ def add_annotation( preferred="--account", help="Add annotations to this account. Follow up with the account's ID. This argument can be given multiple times.", ) +@click.option( + "--timezone", + "timezone", + type=click.STRING, + default=None, + help="Timezone for holiday annotations (e.g. 'Europe/Amsterdam'). " + "Defaults to the FLEXMEASURES_TIMEZONE config setting. " + "Use this to ensure holidays appear at midnight local time in the UI.", +) def add_holidays( year: int, countries: list[str], + calendar_class: str | None, + calendar_kwargs: str | None, + subdiv: str | None, + category: str | None, + package: str | None, generic_asset_ids: list[int], account_ids: list[int], + timezone: str | None, ): - """Add holiday annotations to accounts and/or assets.""" - calendars = workalendar_registry.get_calendars(countries) - num_holidays = {} + """Add holiday annotations to accounts and/or assets. + + Uses the workalendar package by default. Switch to the holidays package with + --package holidays (or automatically by passing --subdiv or --category). + Use --calendar-class to specify a specific workalendar calendar class (e.g. + NetherlandsWithSchoolHolidays) along with --calendar-kwargs for its constructor. + + \b + Examples: + # NL public holidays via workalendar (default) + flexmeasures add holidays --year 2024 --country NL --account 1 --timezone Europe/Amsterdam + + # Bavaria school holidays via holidays package + flexmeasures add holidays --year 2024 --country DE --subdiv BY --category school --account 1 + + # Netherlands school holidays (north region) via workalendar class + flexmeasures add holidays --year 2024 \\ + --calendar-class workalendar.europe.netherlands.NetherlandsWithSchoolHolidays \\ + --calendar-kwargs '{"region": "north"}' --account 1 --timezone Europe/Amsterdam + """ + from zoneinfo import ZoneInfo, ZoneInfoNotFoundError + + # Resolve timezone: default to FLEXMEASURES_TIMEZONE config setting + if timezone is None: + timezone = app.config.get("FLEXMEASURES_TIMEZONE", "UTC") + click.secho( + f"No --timezone given; using FLEXMEASURES_TIMEZONE setting ({timezone!r}). " + "Pass --timezone explicitly to override.", + **MsgStyle.WARN, + ) + try: + ZoneInfo(timezone) + except (ZoneInfoNotFoundError, KeyError): + click.secho(f"Unknown timezone: {timezone!r}", **MsgStyle.ERROR) + raise click.Abort() + + # Determine which package to use + use_holidays_pkg = package == "holidays" or ( + package is None and (subdiv is not None or category is not None) + ) accounts = ( db.session.scalars(select(Account).filter(Account.id.in_(account_ids))).all() @@ -975,36 +1197,42 @@ def add_holidays( if generic_asset_ids else [] ) - annotations = [] - for country, calendar in calendars.items(): - _source = get_or_create_source( - "workalendar", model=country, source_type="CLI script" + + if calendar_class and not use_holidays_pkg: + annotations, num_holidays = _holidays_from_workalendar_class( + year, calendar_class, calendar_kwargs, timezone ) - holidays = calendar().holidays(year) - for holiday in holidays: - start = pd.Timestamp(holiday[0]) - end = start + pd.offsets.DateOffset(days=1) - annotation, _ = get_or_create_annotation( - Annotation( - content=holiday[1], - start=start, - end=end, - source=_source, - type="holiday", - ) - ) - annotations.append(annotation) - num_holidays[country] = len(holidays) + elif use_holidays_pkg: + annotations, num_holidays, effective_category, model_str = ( + _holidays_from_package(year, countries, subdiv, category, timezone) + ) + else: + annotations, num_holidays = _holidays_from_workalendar_registry( + year, countries, timezone + ) + db.session.add_all(annotations) for account in accounts: account.annotations += annotations for asset in assets: asset.annotations += annotations db.session.commit() - click.secho( - f"Successfully added holidays to {len(accounts)} {flexmeasures_inflection.pluralize('account', len(accounts))} and {len(assets)} {flexmeasures_inflection.pluralize('asset', len(assets))}:\n{num_holidays}", - **MsgStyle.SUCCESS, - ) + + if use_holidays_pkg: + click.secho( + f"Successfully added {len(annotations)} holidays (category='{effective_category}') " + f"for {model_str} in {year} to " + f"{len(accounts)} {flexmeasures_inflection.pluralize('account', len(accounts))} and " + f"{len(assets)} {flexmeasures_inflection.pluralize('asset', len(assets))}.", + **MsgStyle.SUCCESS, + ) + else: + click.secho( + f"Successfully added holidays to " + f"{len(accounts)} {flexmeasures_inflection.pluralize('account', len(accounts))} and " + f"{len(assets)} {flexmeasures_inflection.pluralize('asset', len(assets))}:\n{num_holidays}", + **MsgStyle.SUCCESS, + ) @fm_add_data.command("forecasts") diff --git a/flexmeasures/cli/tests/test_data_add.py b/flexmeasures/cli/tests/test_data_add.py index 5386a774bf..8c6e099191 100644 --- a/flexmeasures/cli/tests/test_data_add.py +++ b/flexmeasures/cli/tests/test_data_add.py @@ -94,3 +94,396 @@ def test_cli_help(app): result = runner.invoke(cmd, ["--help"]) check_command_ran_without_error(result) assert "Usage" in result.output + + +def test_add_holidays_with_timezone(app, fresh_db, setup_roles_users_fresh_db): + """Test that add_holidays respects --timezone and stores midnight local time.""" + from flexmeasures.cli.data_add import add_holidays + import pandas as pd + + db = fresh_db + runner = app.test_cli_runner() + result = runner.invoke( + add_holidays, + [ + "--year", + "2024", + "--country", + "NL", + "--account", + "1", + "--timezone", + "Europe/Amsterdam", + ], + ) + check_command_ran_without_error(result) + + # Christmas is Dec 25; in Amsterdam (CET = UTC+1), midnight is 23:00 UTC on Dec 24. + # Verify: annotation start for Christmas 2024 is stored as UTC 23:00 on Dec 24. + christmas = db.session.execute( + select(Annotation).filter( + Annotation.content.ilike("%Christmas%"), + Annotation.start == pd.Timestamp("2024-12-24T23:00:00Z"), + ) + ).scalar_one_or_none() + assert ( + christmas is not None + ), "Christmas annotation should start at 2024-12-24T23:00Z (midnight Amsterdam time)" + + +def test_add_holidays_with_workalendar_school_holidays( + app, fresh_db, setup_roles_users_fresh_db +): + """Test adding NetherlandsWithSchoolHolidays (north region) for 2024 via the CLI.""" + from flexmeasures.cli.data_add import add_holidays + from workalendar.europe.netherlands import NetherlandsWithSchoolHolidays + import json + + db = fresh_db + runner = app.test_cli_runner() + + result = runner.invoke( + add_holidays, + [ + "--year", + "2024", + "--calendar-class", + "workalendar.europe.netherlands.NetherlandsWithSchoolHolidays", + "--calendar-kwargs", + json.dumps({"region": "north"}), + "--account", + "1", + "--timezone", + "Europe/Amsterdam", + ], + ) + check_command_ran_without_error(result) + + # Verify count matches what the calendar directly produces + expected_count = len(NetherlandsWithSchoolHolidays(region="north").holidays(2024)) + count = db.session.scalar( + select(func.count()) + .select_from(Annotation) + .join(AccountAnnotationRelationship) + .filter( + AccountAnnotationRelationship.account_id == 1, + AccountAnnotationRelationship.annotation_id == Annotation.id, + ) + .join(DataSource) + .filter( + DataSource.id == Annotation.source_id, + DataSource.name == "workalendar", + DataSource.model == "NetherlandsWithSchoolHolidays", + ) + ) + assert count == expected_count + # NetherlandsWithSchoolHolidays returns public + school holiday days (a non-trivial set) + assert ( + count > 90 + ), f"Expected >90 NL north school+public holidays in 2024, got {count}" + + +def test_add_holidays_by_package_school(app, fresh_db, setup_roles_users_fresh_db): + """Test adding school holidays via the holidays package. + + Uses Israel (IL) which reliably supports the 'school' category across + holidays-package versions. Germany/Bavaria was removed because the installed + version of the holidays package no longer includes school holidays for DE. + """ + from flexmeasures.cli.data_add import add_holidays + + db = fresh_db + runner = app.test_cli_runner() + result = runner.invoke( + add_holidays, + [ + "--year", + "2024", + "--country", + "IL", + "--category", + "school", + "--account", + "1", + "--timezone", + "Asia/Jerusalem", + ], + ) + check_command_ran_without_error(result) + assert "Successfully added" in result.output + + # Israel has ~19 school holiday days in 2024; use 10 as a conservative lower bound. + count = db.session.scalar( + select(func.count()) + .select_from(Annotation) + .join(AccountAnnotationRelationship) + .filter( + AccountAnnotationRelationship.account_id == 1, + AccountAnnotationRelationship.annotation_id == Annotation.id, + ) + .join(DataSource) + .filter( + DataSource.id == Annotation.source_id, + DataSource.name == "holidays", + DataSource.model == "IL", + ) + ) + assert count > 10, f"Expected >10 IL school holiday days in 2024, got {count}" + + +def test_annotation_regressors_loaded_in_pipeline( + app, fresh_db, setup_roles_users_fresh_db +): + """Test annotation regressors: binary loading and CLI end-to-end. + + Setup + ----- + A factory power sensor has a perfectly constant output of 10 MW, except during + annotated shutdown periods (0 MW). Several shutdowns are added to the + 2023 training window. A forecast-window shutdown covers Jan 15-17 2024. + + Part 1 - BasePipeline._load_annotation_regressor_df + Verify the annotation DataFrame contains 1.0 during the shutdown window and + 0.0 outside it. + + Part 2 - CLI end-to-end + Invoke ``flexmeasures add forecasts`` via the Click test runner using both + the JSON double-quoted form and the Python-literal single-quoted form of + ``--annotation-regressors``. Verify no exception is raised. + + Part 3 - DB persistence + Verify that forecast beliefs were persisted for the full 4-day window. + """ + import json + from datetime import timedelta + + import pandas as pd + from sqlalchemy import insert + + from flexmeasures.data.models.annotations import get_or_create_annotation + from flexmeasures.data.services.data_sources import get_or_create_source + from flexmeasures.data.models.generic_assets import GenericAsset, GenericAssetType + from flexmeasures.data.models.time_series import Sensor, TimedBelief + from flexmeasures.data.models.data_sources import DataSource + from flexmeasures.data.models.forecasting.pipelines.base import BasePipeline + from flexmeasures.cli.data_add import add_forecast + + db = fresh_db + + # ------------------------------------------------------------------ + # 1. Create asset + sensor + # ------------------------------------------------------------------ + asset_type = GenericAssetType(name="Factory") + db.session.add(asset_type) + + factory_asset = GenericAsset(name="Test Factory", generic_asset_type=asset_type) + db.session.add(factory_asset) + db.session.flush() + + power_sensor = Sensor( + "power", + generic_asset=factory_asset, + event_resolution=timedelta(hours=1), + unit="MW", + ) + db.session.add(power_sensor) + db.session.flush() + + # ------------------------------------------------------------------ + # 2. Annotate shutdown periods (2023 training shutdowns + 2024 test shutdown) + # ------------------------------------------------------------------ + ann_source = get_or_create_source( + "test", model="logistics", source_type="CLI script" + ) + + # Quarterly shutdowns spread through 2023 give the model a strong training signal. + # Weekly shutdowns in Dec 2023 / early Jan 2024 ensure the default 30-day lookback + # window (Dec 15 – Jan 14) always contains clear shutdown examples. + shutdown_periods_training = [ + ("2023-02-15", "2023-02-17"), + ("2023-05-15", "2023-05-17"), + ("2023-08-15", "2023-08-17"), + ("2023-11-15", "2023-11-17"), + # weekly shutdowns within the default 30-day lookback + ("2023-12-18", "2023-12-20"), + ("2023-12-25", "2023-12-27"), + ("2024-01-01", "2024-01-03"), + ("2024-01-08", "2024-01-10"), + ] + forecast_shutdown = ("2024-01-15", "2024-01-17") + all_shutdown_periods = shutdown_periods_training + [forecast_shutdown] + + for start_str, end_str in all_shutdown_periods: + ann_obj = Annotation( + content="Factory shutdown", + start=pd.Timestamp(f"{start_str}T00:00:00Z"), + end=pd.Timestamp(f"{end_str}T00:00:00Z"), + source=ann_source, + type="label", + ) + ann, _ = get_or_create_annotation(ann_obj) + factory_asset.annotations.append(ann) + + db.session.flush() + + # ------------------------------------------------------------------ + # 3. Bulk-insert hourly training data: 10 MW normally, 0 MW during shutdowns + # ------------------------------------------------------------------ + data_source = DataSource(name="factory_measurements", type="demo script") + db.session.add(data_source) + db.session.flush() + + # Build a set of shutdown hours for fast lookup + shutdown_hours: set[pd.Timestamp] = set() + for start_str, end_str in all_shutdown_periods: + period = pd.date_range( + start=pd.Timestamp(f"{start_str}T00:00:00Z"), + end=pd.Timestamp(f"{end_str}T00:00:00Z"), + freq="h", + inclusive="left", + ) + shutdown_hours.update(period) + + train_start = pd.Timestamp("2023-01-01T00:00:00Z") + train_end = pd.Timestamp("2024-01-14T00:00:00Z") # up to forecast window + all_hours = pd.date_range( + start=train_start, end=train_end, freq="h", inclusive="left" + ) + + rows = [ + { + "sensor_id": power_sensor.id, + "source_id": data_source.id, + "event_start": ts.to_pydatetime(), + "belief_horizon": timedelta(0), + "cumulative_probability": 0.5, + "event_value": 0.0 if ts in shutdown_hours else 10.0, + } + for ts in all_hours + ] + db.session.execute(insert(TimedBelief), rows) + db.session.commit() + + # ------------------------------------------------------------------ + # Part 1: BasePipeline._load_annotation_regressor_df + # ------------------------------------------------------------------ + annotation_spec = { + "asset": factory_asset.id, + "annotation_type": "label", # snake_case: used directly by BasePipeline + "name": "factory_shutdown", + } + + pipeline = BasePipeline( + target_sensor=power_sensor, + future_regressors=[], + past_regressors=[], + n_steps_to_predict=48, + max_forecast_horizon=24, + forecast_frequency=1, + event_starts_after=pd.Timestamp("2024-01-14T00:00:00Z"), + event_ends_before=pd.Timestamp("2024-01-18T00:00:00Z"), + annotation_regressors=[annotation_spec], + ) + + col_name = pipeline.annotation_regressor_names[0] + + ann_df = pipeline._load_annotation_regressor_df( + spec=annotation_spec, + col_name=col_name, + start=pd.Timestamp("2024-01-14T00:00:00Z"), + end=pd.Timestamp("2024-01-18T00:00:00Z"), + ) + + assert not ann_df.empty, "Annotation regressor DataFrame should not be empty" + assert col_name in ann_df.columns + + shutdown_mask = (ann_df["event_start"] >= pd.Timestamp("2024-01-15")) & ( + ann_df["event_start"] < pd.Timestamp("2024-01-17") + ) + assert ( + ann_df.loc[shutdown_mask, col_name] == 1.0 + ).all(), "Shutdown period should be marked as 1.0" + assert ( + ann_df.loc[~shutdown_mask, col_name] == 0.0 + ).all(), "Non-shutdown period should be marked as 0.0" + + # ------------------------------------------------------------------ + # Part 2: CLI end-to-end + # ------------------------------------------------------------------ + runner = app.test_cli_runner() + sensor_id = str(power_sensor.id) + asset_id = factory_asset.id + common_args = [ + "--sensor", + sensor_id, + "--train-start", + "2023-01-01T00:00+00:00", + "--start", + "2024-01-14T00:00+00:00", + "--end", + "2024-01-18T00:00+00:00", + ] + + # --- Part 2a: JSON double-quoted form; also used for the forecast-effect check --- + json_arg = json.dumps({"asset": asset_id, "annotation-type": "label"}) + result_json = runner.invoke( + add_forecast, common_args + ["--annotation-regressors", json_arg] + ) + assert ( + "Invalid input type" not in result_json.output + ), f"CLI failed to parse JSON form:\n{result_json.output}" + assert result_json.exception is None or "ValidationError" not in str( + result_json.exception + ), f"CLI raised ValidationError (JSON form): {result_json.exception}" + assert result_json.exception is None, ( + f"CLI raised an unexpected exception (JSON form): {result_json.exception}\n" + f"{result_json.output}" + ) + + # ------------------------------------------------------------------ + # Part 3: Verify that forecast beliefs were persisted for the full window. + # + # We do not assert a specific forecast magnitude here: whether the LGBM model + # learns to produce lower values during the shutdown depends on regularisation + # hyper-parameters and data density, which vary across environments. The + # structural correctness of the annotation regressor pipeline is already + # verified in Part 1 (data loading) and Part 2 (CLI parsing + no exception). + # ------------------------------------------------------------------ + from flexmeasures.data.models.data_sources import DataSource as DS + + forecast_source = db.session.execute( + select(DS).filter(DS.model == "TrainPredictPipeline") + ).scalar_one() + + forecast_beliefs = ( + db.session.execute( + select(TimedBelief).where( + TimedBelief.sensor_id == power_sensor.id, + TimedBelief.source_id == forecast_source.id, + TimedBelief.event_start >= pd.Timestamp("2024-01-14T00:00:00Z"), + TimedBelief.event_start < pd.Timestamp("2024-01-18T00:00:00Z"), + ) + ) + .scalars() + .all() + ) + + assert forecast_beliefs, "No forecast beliefs found in DB after CLI invocation" + assert len(forecast_beliefs) == 4 * 24, ( + f"Expected 96 hourly forecast beliefs for the 4-day window, " + f"got {len(forecast_beliefs)}" + ) + + # --- Part 2b: Python-literal single-quoted form – parsing only, no DB check --- + # The second invocation writes to the same window; we only care that argument + # parsing succeeds (no marshmallow ValidationError), not about DB uniqueness. + literal_arg = str({"asset": asset_id, "annotation-type": "label"}) + result_literal = runner.invoke( + add_forecast, common_args + ["--annotation-regressors", literal_arg] + ) + assert ( + "Invalid input type" not in result_literal.output + ), f"CLI failed to parse Python-literal form:\n{result_literal.output}" + assert result_literal.exception is None or "ValidationError" not in str( + result_literal.exception + ), f"CLI raised ValidationError (Python-literal form): {result_literal.exception}" diff --git a/flexmeasures/cli/utils.py b/flexmeasures/cli/utils.py index 62ecbb21fd..388357854f 100644 --- a/flexmeasures/cli/utils.py +++ b/flexmeasures/cli/utils.py @@ -4,6 +4,7 @@ from __future__ import annotations +import ast from typing import Any from datetime import datetime, timedelta @@ -370,16 +371,35 @@ def tabulate_account_assets(assets): ) -class JSONOrFile(click.ParamType): - """ - A Click parameter type that accepts either a JSON string or a file path - to a JSON file. +class NestedDictParamType(click.ParamType): + """Click parameter type that parses a JSON object or a Python-literal dict string. - It attempts to load the input as a file first. If that fails, it assumes - the input is a JSON string and tries to parse it. + Accepts both JSON double-quoted syntax (``{"key": "value"}``) and Python-literal + single-quoted syntax (``{'key': 'value'}``). Used for CLI options whose Marshmallow + field type is ``fields.List(fields.Nested(...))``. """ - name = "json_or_file" + name = "DICT" + + def convert(self, value, param, ctx): + if isinstance(value, dict): + return value + try: + return json.loads(value) + except json.JSONDecodeError: + try: + return ast.literal_eval(value) + except (ValueError, SyntaxError): + self.fail( + f"Cannot parse as a JSON object or Python-literal dict: {value!r}", + param, + ctx, + ) + + +class JSONOrFile(click.ParamType): + + name = "JSON_OR_FILE" def convert(self, value, param, ctx): """ @@ -477,7 +497,11 @@ def decorator(command): kwargs["type"] = str elif isinstance(field, fields.List): kwargs["multiple"] = True - kwargs["type"] = str + if isinstance(field.inner, fields.Nested): + # Each value is a dict string; parse it at the Click level. + kwargs["type"] = NestedDictParamType() + else: + kwargs["type"] = str command = click.option(*options, **kwargs)(command) diff --git a/flexmeasures/data/models/forecasting/pipelines/base.py b/flexmeasures/data/models/forecasting/pipelines/base.py index 120988d8ec..8f8a37b6c7 100644 --- a/flexmeasures/data/models/forecasting/pipelines/base.py +++ b/flexmeasures/data/models/forecasting/pipelines/base.py @@ -14,6 +14,18 @@ from flexmeasures.data.models.forecasting.exceptions import NotEnoughDataException +class _AnnotationRegressorProxy: + """Minimal proxy so annotation regressors can reuse sensor-based pipeline utilities. + + Provides event_resolution and basic attributes expected by detect_and_fill_missing_values. + """ + + def __init__(self, name: str, event_resolution): + self.name = name + self.id = f"annotation:{name}" + self.event_resolution = event_resolution + + class BasePipeline: """ Base class for Train and Predict pipelines. @@ -69,6 +81,7 @@ def __init__( predict_start: datetime | None = None, predict_end: datetime | None = None, missing_threshold: float = 1.0, + annotation_regressors: list[dict] | None = None, ) -> None: self.future = future_regressors self.past = past_regressors @@ -105,6 +118,20 @@ def __init__( ) # convert max_forecast_horizon to hours self.forecast_frequency = forecast_frequency self.missing_threshold = missing_threshold + self.annotation_regressors = annotation_regressors or [] + # Build column names and proxy objects for annotation regressors + # Use `or` so that None or empty-string names fall back to the default. + self.annotation_regressor_proxies = [ + _AnnotationRegressorProxy( + name=spec.get("name") or f"annotation_regressor_{i}", + event_resolution=target_sensor.event_resolution, + ) + for i, spec in enumerate(self.annotation_regressors) + ] + self.annotation_regressor_names = [ + f"{proxy.name} (annotation)_AR-{i}" + for i, proxy in enumerate(self.annotation_regressor_proxies) + ] def load_data_all_beliefs(self) -> pd.DataFrame: """ @@ -151,13 +178,19 @@ def load_data_all_beliefs(self) -> pd.DataFrame: ), # we exclude forecasters for target dataframe as to not use forecasts in target. ) try: - # We resample regressors to the target sensor’s resolution so they align in time. + # We resample regressors to the target sensor's resolution so they align in time. # This ensures the resulting DataFrame can be used directly for predictions. - df = tb_utils.replace_multi_index_level( - df, - "event_start", - df.event_starts.floor(self.target_sensor.event_resolution), - ) + event_starts = df.event_starts + try: + floored = event_starts.floor(self.target_sensor.event_resolution) + except Exception: + # DST ambiguity: convert to UTC, floor, convert back to original tz. + floored = ( + event_starts.tz_convert("UTC") + .floor(self.target_sensor.event_resolution) + .tz_convert(event_starts.tz) + ) + df = tb_utils.replace_multi_index_level(df, "event_start", floored) except Exception as e: logging.warning(f"Error during custom resample for {name}: {e}") @@ -191,8 +224,145 @@ def load_data_all_beliefs(self) -> pd.DataFrame: data_pd["belief_time"], utc=True ).dt.tz_localize(None) + # Append annotation regressors as future covariates + if self.annotation_regressors: + ann_end = self.event_ends_before + pd.Timedelta( + hours=self.max_forecast_horizon_in_hours + ) + for spec, col_name in zip( + self.annotation_regressors, self.annotation_regressor_names + ): + ann_df = self._load_annotation_regressor_df( + spec=spec, + col_name=col_name, + start=self.event_starts_after, + end=ann_end, + ) + if not ann_df.empty: + data_pd = data_pd.merge( + ann_df[["event_start", col_name]], + on="event_start", + how="left", + ) + data_pd[col_name] = data_pd[col_name].fillna(0.0) + logging.debug( + "Added %d annotation regressor(s) to data: %s", + len(self.annotation_regressors), + self.annotation_regressor_names, + ) + return data_pd + def _load_annotation_regressor_df( + self, + spec: dict, + col_name: str, + start: datetime, + end: datetime, + ) -> pd.DataFrame: + """Load an annotation regressor as a binary 0/1 time series DataFrame. + + Queries annotations for the given account, asset, or sensor, then marks each + time step at the target sensor's resolution as 1 (if an annotation + covers it) or 0 (otherwise). + + Annotations are treated as "always known" (belief_time = epoch start), + making them suitable as future covariates for known future events like + public holidays. + + :param spec: Dict with 'account', 'asset', or 'sensor' (ID), and optionally + 'annotation_type' (default: 'holiday') and 'name'. + :param col_name: Column name to use in the returned DataFrame. + :param start: Start of the time range (inclusive). + :param end: End of the time range (exclusive). + :returns: DataFrame with columns [event_start, belief_time, col_name]. + """ + from flexmeasures.data import db + from flexmeasures.data.queries.annotations import ( + query_asset_annotations, + query_account_annotations, + query_sensor_annotations, + ) + + annotation_type = spec.get("annotation_type", "holiday") + account_id = spec.get("account") + asset_id = spec.get("asset") + sensor_id = spec.get("sensor") + + if account_id is not None: + query = query_account_annotations( + account_id=account_id, + annotations_after=start, + annotations_before=end, + annotation_type=annotation_type, + ) + elif asset_id is not None: + query = query_asset_annotations( + asset_id=asset_id, + annotations_after=start, + annotations_before=end, + annotation_type=annotation_type, + ) + elif sensor_id is not None: + query = query_sensor_annotations( + sensor_id=sensor_id, + annotations_after=start, + annotations_before=end, + annotation_type=annotation_type, + ) + else: + logging.warning( + "Annotation regressor spec %r (column: %s) has no 'account', 'asset', or 'sensor'; skipping.", + spec, + col_name, + ) + return pd.DataFrame(columns=["event_start", "belief_time", col_name]) + + annotations = db.session.execute(query).scalars().all() + + # Build the full time index at target resolution. + # Normalise start/end to UTC first so pd.date_range never sees two + # tz-aware endpoints with different UTC offsets (e.g. CET vs CEST). + resolution = self.target_sensor.event_resolution + start_utc = ( + pd.Timestamp(start).tz_convert("UTC") + if pd.Timestamp(start).tzinfo + else pd.Timestamp(start, tz="UTC") + ) + end_utc = ( + pd.Timestamp(end).tz_convert("UTC") + if pd.Timestamp(end).tzinfo + else pd.Timestamp(end, tz="UTC") + ) + time_index = pd.date_range( + start=start_utc, end=end_utc, freq=resolution, inclusive="left" + ) + # Strip timezone info to match the convention in load_data_all_beliefs + time_index = time_index.tz_localize(None) + + binary = pd.Series(0.0, index=time_index, name=col_name) + + for ann in annotations: + ann_start = pd.Timestamp(ann.start) + ann_end = pd.Timestamp(ann.end) + if ann_start.tzinfo is not None: + ann_start = ann_start.tz_convert("UTC").tz_localize(None) + if ann_end.tzinfo is not None: + ann_end = ann_end.tz_convert("UTC").tz_localize(None) + mask = (binary.index >= ann_start) & (binary.index < ann_end) + binary.loc[mask] = 1.0 + + # Use epoch as belief_time so annotations are always considered "known" + belief_time = pd.Timestamp("1970-01-01") + df = pd.DataFrame( + { + "event_start": time_index, + "belief_time": belief_time, + col_name: binary.values, + } + ) + return df + def split_data_all_beliefs( # noqa: C901 self, df: pd.DataFrame, is_predict_pipeline: bool = False ) -> tuple[ @@ -471,8 +641,9 @@ def _latest_known_by_event_start( future_covariates = self.detect_and_fill_missing_values( df=future_df, - sensors=self.future, - sensor_names=self.future_regressors, + sensors=self.future + self.annotation_regressor_proxies, + sensor_names=self.future_regressors + + self.annotation_regressor_names, start=target_start, end=forecast_end + self.target_sensor.event_resolution, ) @@ -502,7 +673,7 @@ def _latest_known_by_event_start( ) # Autoregressive-only case - if not self.past and not self.future: + if not self.past and not self.future and not self.annotation_regressors: logging.info("Using autoregressive forecasting.") y = df[["event_start", "belief_time", self.target]].copy() @@ -519,10 +690,50 @@ def _latest_known_by_event_start( else None ) X_future_regressors_df = ( - df[["event_start", "belief_time"] + self.future_regressors] - if self.future != [] + df[ + ["event_start", "belief_time"] + + self.future_regressors + + self.annotation_regressor_names + ] + if self.future != [] or self.annotation_regressors else None ) + + # Annotation regressors are "always known" (holiday/shutdown calendars), + # but they inherit the target sensor's belief_time via the merge in + # load_data_all_beliefs. The split logic needs two separate sets of rows + # for each annotation event so that: + # - realized_slice (training window): belief_time = event_start + resolution + # satisfies the `belief_time > event_start` ("realized_only") filter. + # - fc_window (forecast window): belief_time = epoch (1970-01-01) + # satisfies `belief_time <= event_start` and is always <= any current time. + # Without these extra rows annotation data is invisible to the model. + if self.annotation_regressors and X_future_regressors_df is not None: + resolution = self.target_sensor.event_resolution + ann_base = ( + X_future_regressors_df[ + ["event_start"] + self.annotation_regressor_names + ] + .drop_duplicates("event_start") + .copy() + ) + # Sensor-based future regressors carry no annotation info in these rows. + for col in self.future_regressors: + ann_base[col] = float("nan") + + epoch_rows = ann_base.copy() + epoch_rows["belief_time"] = pd.Timestamp("1970-01-01") + + post_rows = ann_base.copy() + post_rows["belief_time"] = ann_base["event_start"] + resolution + + extra = pd.concat([epoch_rows, post_rows], ignore_index=True) + extra = extra[X_future_regressors_df.columns] + X_future_regressors_df = ( + pd.concat([X_future_regressors_df, extra], ignore_index=True) + .sort_values(["event_start", "belief_time"]) + .reset_index(drop=True) + ) y = ( df[["event_start", "belief_time", self.target]] .dropna() diff --git a/flexmeasures/data/models/forecasting/pipelines/predict.py b/flexmeasures/data/models/forecasting/pipelines/predict.py index 54ae993059..eb606aa487 100644 --- a/flexmeasures/data/models/forecasting/pipelines/predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/predict.py @@ -40,6 +40,7 @@ def __init__( predict_end: datetime | None = None, data_source: Source = None, missing_threshold: float = 1.0, + annotation_regressors: list[dict] | None = None, ) -> None: """ Initialize the PredictPipeline. @@ -78,6 +79,7 @@ def __init__( missing_threshold=missing_threshold, save_belief_time=save_belief_time, beliefs_before=beliefs_before, + annotation_regressors=annotation_regressors, ) self.model_path = model_path self.output_path = output_path diff --git a/flexmeasures/data/models/forecasting/pipelines/train.py b/flexmeasures/data/models/forecasting/pipelines/train.py index dc9aa48a76..4d6f3d0183 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train.py +++ b/flexmeasures/data/models/forecasting/pipelines/train.py @@ -32,6 +32,7 @@ def __init__( probabilistic: bool = False, ensure_positive: bool = False, missing_threshold: float = 1.0, + annotation_regressors: list[dict] | None = None, ) -> None: """ Initialize the TrainPipeline. @@ -53,7 +54,11 @@ def __init__( self.model_save_dir = model_save_dir self.probabilistic = probabilistic self.auto_regressive = ( - True if not past_regressors and not future_regressors else False + True + if not past_regressors + and not future_regressors + and not annotation_regressors + else False ) self.ensure_positive = ensure_positive super().__init__( @@ -68,6 +73,7 @@ def __init__( beliefs_before=beliefs_before, forecast_frequency=forecast_frequency, missing_threshold=missing_threshold, + annotation_regressors=annotation_regressors, ) def train_model( diff --git a/flexmeasures/data/models/forecasting/pipelines/train_predict.py b/flexmeasures/data/models/forecasting/pipelines/train_predict.py index 88883e132c..c3cebcd302 100644 --- a/flexmeasures/data/models/forecasting/pipelines/train_predict.py +++ b/flexmeasures/data/models/forecasting/pipelines/train_predict.py @@ -123,9 +123,8 @@ def run_cycle( probabilistic=self._parameters["probabilistic"], ensure_positive=self._config["ensure_positive"], missing_threshold=self._config.get("missing_threshold"), + annotation_regressors=self._config.get("annotation_regressors", []), ) - - logging.info(f"Training cycle from {train_start} to {train_end} started ...") train_start_time = time.time() train_pipeline.run(counter=counter) train_runtime = time.time() - train_start_time @@ -164,6 +163,7 @@ def run_cycle( sensor_to_save=self._parameters["sensor_to_save"], data_source=self.data_source, missing_threshold=self._config.get("missing_threshold"), + annotation_regressors=self._config.get("annotation_regressors", []), ) logging.info( f"Prediction cycle from {predict_start} to {predict_end} started ..." @@ -193,12 +193,16 @@ def _compute_forecast(self, as_job: bool = False, **kwargs) -> list[dict[str, An def _derive_training_period(self) -> tuple[datetime, datetime]: """Derive the effective training period for model fitting. - The training period ends at ``predict_start`` and starts at the - most restrictive (latest) of the following: + Priority (most restrictive start date wins): + + 1. ``train_start`` (if explicitly configured via ``--train-start``). + 2. ``predict_start - train_period`` (if ``--train-period`` was explicitly set). + 3. ``predict_start - max_training_period`` (always enforced as the outer bound). - - The configured ``start_date`` (if any) - - ``predict_start - train_period_in_hours`` (if configured) - - ``predict_start - max_training_period`` (always enforced) + When ``--train-start`` is set the ``--train-period`` is ignored – the + effective period is simply ``predict_start - train_start``, capped to + ``max_training_period``. This prevents the old 30-day default from + silently overriding an explicit start date. Additionally, the resulting training window is guaranteed to span at least two days. @@ -210,18 +214,20 @@ def _derive_training_period(self) -> tuple[datetime, datetime]: configured_start: datetime | None = self._config.get("train_start") period_hours: int | None = self._config.get("train_period_in_hours") - candidates: list[datetime] = [] + # Outer bound: never go further back than max_training_period. + max_period_start = train_end - self._config["max_training_period"] if configured_start is not None: - candidates.append(configured_start) - - if period_hours is not None: - candidates.append(train_end - timedelta(hours=period_hours)) - - # Always enforce maximum training period - candidates.append(train_end - self._config["max_training_period"]) - - train_start = max(candidates) + # Explicit train_start takes full precedence; period is ignored. + train_start = max(configured_start, max_period_start) + elif period_hours is not None: + # Explicit train_period without train_start. + train_start = max( + train_end - timedelta(hours=period_hours), max_period_start + ) + else: + # Neither set: use the full max_training_period window. + train_start = max_period_start # Enforce minimum training period of 2 days min_training_period = timedelta(days=2) diff --git a/flexmeasures/data/queries/annotations.py b/flexmeasures/data/queries/annotations.py index 0a56e571c4..94daaa1381 100644 --- a/flexmeasures/data/queries/annotations.py +++ b/flexmeasures/data/queries/annotations.py @@ -7,7 +7,9 @@ from flexmeasures.data.models.annotations import ( Annotation, + AccountAnnotationRelationship, GenericAssetAnnotationRelationship, + SensorAnnotationRelationship, ) from flexmeasures.data.models.data_sources import DataSource @@ -46,3 +48,75 @@ def query_asset_annotations( Annotation.type == annotation_type, ) return query + + +def query_account_annotations( + account_id: int, + annotations_after: datetime | None = None, + annotations_before: datetime | None = None, + sources: list[DataSource] | None = None, + annotation_type: str | None = None, +) -> Query: + """Match annotations assigned to the given account.""" + query = ( + select(Annotation) + .join(AccountAnnotationRelationship) + .filter( + AccountAnnotationRelationship.account_id == account_id, + AccountAnnotationRelationship.annotation_id == Annotation.id, + ) + ) + + if annotations_after is not None: + query = query.filter( + Annotation.end > annotations_after, + ) + if annotations_before is not None: + query = query.filter( + Annotation.start < annotations_before, + ) + if sources: + query = query.filter( + Annotation.source.in_(sources), + ) + if annotation_type is not None: + query = query.filter( + Annotation.type == annotation_type, + ) + return query + + +def query_sensor_annotations( + sensor_id: int, + annotations_after: datetime | None = None, + annotations_before: datetime | None = None, + sources: list[DataSource] | None = None, + annotation_type: str | None = None, +) -> Query: + """Match annotations assigned to the given sensor.""" + query = ( + select(Annotation) + .join(SensorAnnotationRelationship) + .filter( + SensorAnnotationRelationship.sensor_id == sensor_id, + SensorAnnotationRelationship.annotation_id == Annotation.id, + ) + ) + + if annotations_after is not None: + query = query.filter( + Annotation.end > annotations_after, + ) + if annotations_before is not None: + query = query.filter( + Annotation.start < annotations_before, + ) + if sources: + query = query.filter( + Annotation.source.in_(sources), + ) + if annotation_type is not None: + query = query.filter( + Annotation.type == annotation_type, + ) + return query diff --git a/flexmeasures/data/schemas/forecasting/pipeline.py b/flexmeasures/data/schemas/forecasting/pipeline.py index 1d4fe62d7d..70a152d03d 100644 --- a/flexmeasures/data/schemas/forecasting/pipeline.py +++ b/flexmeasures/data/schemas/forecasting/pipeline.py @@ -12,6 +12,7 @@ validates_schema, pre_load, post_load, + post_dump, ValidationError, ) @@ -26,6 +27,41 @@ from flexmeasures.utils.time_utils import server_now +class AnnotationRegressorSchema(Schema): + """Schema for a single annotation regressor in the forecasting pipeline config.""" + + account = fields.Int( + load_default=None, + metadata={"description": "Account ID whose annotations to use."}, + ) + asset = fields.Int( + load_default=None, + metadata={"description": "Asset ID whose annotations to use."}, + ) + sensor = fields.Int( + load_default=None, + metadata={"description": "Sensor ID whose annotations to use."}, + ) + annotation_type = fields.Str( + data_key="annotation-type", + load_default="holiday", + metadata={ + "description": "Type of annotation to use (e.g. 'holiday', 'label', 'alert'). Defaults to 'holiday'." + }, + ) + name = fields.Str( + load_default=None, + metadata={ + "description": "Human-readable column name for this regressor. Defaults to 'annotation_regressor_'." + }, + ) + + @post_dump + def remove_none_values(self, data, **kwargs): + """Omit null fields from the serialised config to keep it clean.""" + return {k: v for k, v in data.items() if v is not None} + + class TrainPredictPipelineConfigSchema(Schema): model = fields.String(load_default="CustomLGBM") @@ -74,6 +110,25 @@ class TrainPredictPipelineConfigSchema(Schema): }, }, ) + annotation_regressors = fields.List( + fields.Nested(AnnotationRegressorSchema()), + data_key="annotation-regressors", + load_default=[], + metadata={ + "description": ( + "Annotation sources to use as binary future regressors. " + "Each entry must specify 'account', 'asset', or 'sensor' (ID), and optionally " + "'annotation-type' (default: 'holiday') and 'name' (default: auto-generated). " + "Annotations are converted to a binary 0/1 time series: 1 during annotated periods." + ), + "example": [ + {"account": 1, "annotation-type": "holiday", "name": "holidays"} + ], + "cli": { + "option": "--annotation-regressors", + }, + }, + ) missing_threshold = fields.Float( data_key="missing-threshold", load_default=1.0, @@ -114,10 +169,15 @@ class TrainPredictPipelineConfigSchema(Schema): ) train_period = DurationField( data_key="train-period", - load_default=timedelta(days=30), + load_default=None, allow_none=True, metadata={ - "description": "Duration of the initial training period (ISO 8601 format, min 2 days). If not set, derived from train_start and start if not set or defaults to P30D (30 days).", + "description": ( + "Duration of the initial training period (ISO 8601 format, min 2 days). " + "If not set and --train-start is provided, the period is the difference " + "between --start and --train-start, capped to --max-training-period. " + "If neither is set, --max-training-period is used as the training window." + ), "example": "P7D", "cli": { "cli-exclusive": True, @@ -193,9 +253,16 @@ def resolve_config(self, data: dict, **kwargs) -> dict: # noqa: C901 data["future_regressors"] = future_regressors data["past_regressors"] = past_regressors - train_period_in_hours = data["train_period"] // timedelta(hours=1) + train_period_in_hours = ( + data["train_period"] // timedelta(hours=1) + if data.get("train_period") is not None + else None + ) max_training_period = data["max_training_period"] - if train_period_in_hours > max_training_period // timedelta(hours=1): + if ( + train_period_in_hours is not None + and train_period_in_hours > max_training_period // timedelta(hours=1) + ): train_period_in_hours = max_training_period // timedelta(hours=1) logging.warning( f"train-period is greater than max-training-period ({max_training_period}), setting train-period to max-training-period", diff --git a/flexmeasures/ui/static/openapi-specs.json b/flexmeasures/ui/static/openapi-specs.json index eaa82471b1..506d4f7097 100644 --- a/flexmeasures/ui/static/openapi-specs.json +++ b/flexmeasures/ui/static/openapi-specs.json @@ -4616,6 +4616,49 @@ }, "additionalProperties": false }, + "AnnotationRegressor": { + "type": "object", + "properties": { + "account": { + "type": [ + "integer", + "null" + ], + "default": null, + "description": "Account ID whose annotations to use." + }, + "asset": { + "type": [ + "integer", + "null" + ], + "default": null, + "description": "Asset ID whose annotations to use." + }, + "sensor": { + "type": [ + "integer", + "null" + ], + "default": null, + "description": "Sensor ID whose annotations to use." + }, + "annotation-type": { + "type": "string", + "default": "holiday", + "description": "Type of annotation to use (e.g. 'holiday', 'label', 'alert'). Defaults to 'holiday'." + }, + "name": { + "type": [ + "string", + "null" + ], + "default": null, + "description": "Human-readable column name for this regressor. Defaults to 'annotation_regressor_'." + } + }, + "additionalProperties": false + }, "TrainPredictPipelineConfigSchemaOpenAPI": { "type": "object", "properties": { @@ -4659,6 +4702,21 @@ "type": "integer" } }, + "annotation-regressors": { + "type": "array", + "default": [], + "description": "Annotation sources to use as binary future regressors. Each entry must specify 'account', 'asset', or 'sensor' (ID), and optionally 'annotation-type' (default: 'holiday') and 'name' (default: auto-generated). Annotations are converted to a binary 0/1 time series: 1 during annotated periods.", + "example": [ + { + "account": 1, + "annotation-type": "holiday", + "name": "holidays" + } + ], + "items": { + "$ref": "#/components/schemas/AnnotationRegressor" + } + }, "missing-threshold": { "type": "number", "default": 1.0, diff --git a/pyproject.toml b/pyproject.toml index 9a42903333..a7e884f73b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "iso8601>=2.1.0", "xlrd>=2.0.2", "workalendar>=17.0.0", + "holidays>=0.57", "inflection>=0.5.1", "inflect>=7.5.0", "pydantic>=2.12.5", diff --git a/uv.lock b/uv.lock index 13956c991e..f0b5695086 100644 --- a/uv.lock +++ b/uv.lock @@ -1173,6 +1173,7 @@ dependencies = [ { name = "flask-swagger-ui", marker = "(os_name != 'nt' and sys_platform == 'darwin') or (os_name != 'nt' and sys_platform == 'linux') or sys_platform == 'win32'" }, { name = "flask-wtf", marker = "(os_name != 'nt' and sys_platform == 'darwin') or (os_name != 'nt' and sys_platform == 'linux') or sys_platform == 'win32'" }, { name = "highspy", marker = "(os_name != 'nt' and sys_platform == 'darwin') or (os_name != 'nt' and sys_platform == 'linux') or sys_platform == 'win32'" }, + { name = "holidays", marker = "(os_name != 'nt' and sys_platform == 'darwin') or (os_name != 'nt' and sys_platform == 'linux') or sys_platform == 'win32'" }, { name = "humanize", marker = "(os_name != 'nt' and sys_platform == 'darwin') or (os_name != 'nt' and sys_platform == 'linux') or sys_platform == 'win32'" }, { name = "inflect", marker = "(os_name != 'nt' and sys_platform == 'darwin') or (os_name != 'nt' and sys_platform == 'linux') or sys_platform == 'win32'" }, { name = "inflection", marker = "(os_name != 'nt' and sys_platform == 'darwin') or (os_name != 'nt' and sys_platform == 'linux') or sys_platform == 'win32'" }, @@ -1293,6 +1294,7 @@ requires-dist = [ { name = "flask-swagger-ui", specifier = ">=5.21.0" }, { name = "flask-wtf", specifier = ">=1.2.2" }, { name = "highspy", specifier = ">=1.12" }, + { name = "holidays", specifier = ">=0.57" }, { name = "humanize", specifier = ">=4.15.0" }, { name = "inflect", specifier = ">=7.5.0" }, { name = "inflection", specifier = ">=0.5.1" },