Skip to content

Commit 4bbe554

Browse files
authored
Fixed EndDeviceList pollRate Notifications (#307)
1 parent 66b3d4e commit 4bbe554

File tree

8 files changed

+189
-15
lines changed

8 files changed

+189
-15
lines changed

src/envoy/admin/crud/aggregator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Sequence, Iterable
1+
from typing import Iterable, Sequence
22

33
import sqlalchemy as sa
44
from sqlalchemy.ext.asyncio import AsyncSession
@@ -14,7 +14,7 @@ async def count_all_aggregators(session: AsyncSession) -> int:
1414
return resp.scalar_one()
1515

1616

17-
async def select_all_aggregators(session: AsyncSession, start: int, limit: int) -> Sequence[Aggregator]:
17+
async def select_all_aggregators(session: AsyncSession, start: int | None, limit: int | None) -> Sequence[Aggregator]:
1818
"""Admin selecting of aggregators - will include domains relationship (the NULL_AGGREGATOR is not included)"""
1919

2020
stmt = (

src/envoy/admin/manager/config.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,18 @@ async def update_current_config(session: AsyncSession, updated_values: RuntimeSe
3535
else:
3636
existing_db_config.changed_time = now
3737

38+
changed_fsal_pollrate = False
39+
changed_edevl_pollrate = False
40+
3841
if updated_values.dcap_pollrate_seconds is not None:
3942
existing_db_config.dcap_pollrate_seconds = updated_values.dcap_pollrate_seconds
4043

4144
if updated_values.edevl_pollrate_seconds is not None:
45+
changed_edevl_pollrate = existing_db_config.edevl_pollrate_seconds != updated_values.edevl_pollrate_seconds
4246
existing_db_config.edevl_pollrate_seconds = updated_values.edevl_pollrate_seconds
4347

4448
if updated_values.fsal_pollrate_seconds is not None:
49+
changed_fsal_pollrate = existing_db_config.fsal_pollrate_seconds != updated_values.fsal_pollrate_seconds
4550
existing_db_config.fsal_pollrate_seconds = updated_values.fsal_pollrate_seconds
4651

4752
if updated_values.derpl_pollrate_seconds is not None:
@@ -61,7 +66,13 @@ async def update_current_config(session: AsyncSession, updated_values: RuntimeSe
6166

6267
await session.commit()
6368

64-
await NotificationManager.notify_changed_deleted_entities(SubscriptionResource.FUNCTION_SET_ASSIGNMENTS, now)
69+
if changed_fsal_pollrate:
70+
await NotificationManager.notify_changed_deleted_entities(
71+
SubscriptionResource.FUNCTION_SET_ASSIGNMENTS, now
72+
)
73+
74+
if changed_edevl_pollrate:
75+
await NotificationManager.notify_changed_deleted_entities(SubscriptionResource.SITE, now)
6576

6677
@staticmethod
6778
async def fetch_config_response(session: AsyncSession) -> RuntimeServerConfigResponse:

src/envoy/notification/crud/batch.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from sqlalchemy.ext.asyncio import AsyncSession
77
from sqlalchemy.orm import selectinload
88

9+
from envoy.admin.crud.aggregator import select_all_aggregators
910
from envoy.notification.crud.archive import (
1011
fetch_entities_with_archive_by_datetime,
1112
fetch_entities_with_archive_by_id,
@@ -25,6 +26,7 @@
2526
from envoy.server.crud.common import localize_start_time_for_entity
2627
from envoy.server.crud.server import select_server_config
2728
from envoy.server.manager.der_constants import PUBLIC_SITE_DER_ID
29+
from envoy.server.model.aggregator import Aggregator
2830
from envoy.server.model.archive.doe import ArchiveDynamicOperatingEnvelope, ArchiveSiteControlGroup
2931
from envoy.server.model.archive.site import (
3032
ArchiveDefaultSiteControl,
@@ -90,6 +92,21 @@ def __init__(
9092
self.models_by_batch_key = AggregatorBatchedEntities._generate_batch_dict(resource, models)
9193
self.deleted_by_batch_key = AggregatorBatchedEntities._generate_batch_dict(resource, deleted_models)
9294

95+
@staticmethod
96+
def aggregator_id_instance(
97+
timestamp: datetime, resource: SubscriptionResource, aggregators: Sequence[Aggregator]
98+
) -> "AggregatorBatchedEntities":
99+
"""This will generate an instance with the models_by_batch_key loaded with a key for each aggregator instance
100+
(key being a single tuple[aggregator_id: int]). Each of the entries will be an empty list.
101+
102+
This will be a mechanism for prepping an "empty list" notification for representing things like a pollRate
103+
change"""
104+
105+
batch = AggregatorBatchedEntities(timestamp, resource, [], [])
106+
for agg in aggregators:
107+
batch.models_by_batch_key[(agg.aggregator_id,)] = []
108+
return batch
109+
93110

94111
def get_batch_key(resource: SubscriptionResource, entity: TResourceModel) -> tuple:
95112
"""
@@ -101,7 +118,7 @@ def get_batch_key(resource: SubscriptionResource, entity: TResourceModel) -> tup
101118
102119
Given the SubscriptionResource - it's safe to rely on the ordering of the batch key tuple entries:
103120
104-
SubscriptionResource.SITE: (aggregator_id: int, site_id: int)
121+
SubscriptionResource.SITE: (aggregator_id: int, site_id: int) OR (aggregator_id: int)
105122
SubscriptionResource.DYNAMIC_OPERATING_ENVELOPE: (aggregator_id: int, site_id: int, site_control_group_id: int)
106123
SubscriptionResource.READING: (aggregator_id: int, site_id: int, group_id: int)
107124
SubscriptionResource.TARIFF_GENERATED_RATE: (aggregator_id: int, tariff_id: int, site_id: int, day: date)
@@ -247,10 +264,21 @@ async def fetch_sites_by_changed_at(
247264
) -> AggregatorBatchedEntities[Site, ArchiveSite]:
248265
"""Fetches all sites matching the specified changed_at and returns them keyed by their aggregator/site id
249266
250-
Also fetches any site from the archive that was deleted at the specified timestamp"""
267+
Also fetches any site from the archive that was deleted at the specified timestamp.
251268
252-
active_sites, deleted_sites = await fetch_entities_with_archive_by_datetime(session, Site, ArchiveSite, timestamp)
269+
This will also consider runtime config for the EndDeviceList.pollRate - If this is altered, it will instead
270+
generate a notification for EVERY EndDevice"""
271+
272+
# If run time config has been marked as a "Site" update - it's because it has had a change in poll rate
273+
# In this circumstance - we generate a special kind of update that will result in an "Empty List" Notification
274+
# that will just show the pollRate change.
275+
runtime_cfg = await select_server_config(session)
276+
if runtime_cfg is not None and runtime_cfg.changed_time == timestamp:
277+
aggregators = await select_all_aggregators(session, None, None)
278+
return AggregatorBatchedEntities.aggregator_id_instance(timestamp, SubscriptionResource.SITE, aggregators)
253279

280+
# Otherwise - we proceed as if sites are the table that is changing
281+
active_sites, deleted_sites = await fetch_entities_with_archive_by_datetime(session, Site, ArchiveSite, timestamp)
254282
return AggregatorBatchedEntities(timestamp, SubscriptionResource.SITE, active_sites, deleted_sites)
255283

256284

@@ -691,7 +719,7 @@ async def fetch_default_site_controls_by_changed_at(
691719
async def fetch_runtime_config_by_changed_at(
692720
session: AsyncSession, timestamp: datetime
693721
) -> AggregatorBatchedEntities[SiteScopedRuntimeServerConfig, ArchiveSiteScopedRuntimeServerConfig]: # type: ignore # noqa: E501
694-
"""Fetches all RuntimeServerCOnfig instances matching the specified changed_at and returns them keyed by their
722+
"""Fetches all RuntimeServerConfig instances matching the specified changed_at and returns them keyed by their
695723
aggregator/site id"""
696724

697725
runtime_cfg = await select_server_config(session)

src/envoy/notification/task/check.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ def entities_to_notification(
235235
scope = scope_for_subscription(sub, href_prefix)
236236
if resource == SubscriptionResource.SITE:
237237
return NotificationMapper.map_sites_to_response(
238-
cast(Sequence[Site], entities), sub, scope, notification_type, config.disable_edev_registration # type: ignore # mypy quirk # noqa: E501
238+
cast(Sequence[Site], entities), sub, scope, notification_type, config.disable_edev_registration, config.edevl_pollrate_seconds # type: ignore # mypy quirk # noqa: E501
239239
)
240240
elif resource == SubscriptionResource.TARIFF_GENERATED_RATE:
241241
if pricing_reading_type is None:
@@ -406,10 +406,27 @@ async def check_db_change_or_delete(
406406
if entity_limit > MAX_NOTIFICATION_PAGE_SIZE:
407407
entity_limit = MAX_NOTIFICATION_PAGE_SIZE
408408

409-
entities_to_notify = entities_serviced_by_subscription(sub, resource, entities)
410-
all_notifications.extend(
411-
get_entity_pages(resource, sub, batch_key, entity_limit, entities_to_notify, notification_type)
412-
)
409+
if entities:
410+
# Normally we're going to have a batch of entities that should be sent out via notifications
411+
entities_to_notify = entities_serviced_by_subscription(sub, resource, entities)
412+
all_notifications.extend(
413+
get_entity_pages(resource, sub, batch_key, entity_limit, entities_to_notify, notification_type)
414+
)
415+
else:
416+
# But we can end up in this state if the subscription is at the List and an attribute on the list has
417+
# changed (eg pollRate) - i.e. there are no child list items to indicate as changed - JUST the list.
418+
if sub.resource_type == resource:
419+
# All we need is a match on the type of subscription to generate the subscription
420+
all_notifications.append(
421+
NotificationEntities(
422+
entities=[], # No entities - we're just wanting the parent List to notify as empty
423+
subscription=sub,
424+
notification_id=uuid4(),
425+
notification_type=NotificationType.ENTITY_CHANGED,
426+
batch_key=batch_key,
427+
pricing_reading_type=None,
428+
)
429+
)
413430

414431
# Finally time to enqueue the outgoing notifications
415432
logger.info(

src/envoy/server/mapper/sep2/pub_sub.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ def map_sites_to_response(
453453
scope: AggregatorRequestScope,
454454
notification_type: NotificationType,
455455
disable_registration: bool,
456+
poll_rate_seconds: int,
456457
) -> Notification:
457458
"""Turns a list of sites into a notification"""
458459
return Notification.model_validate(
@@ -462,6 +463,7 @@ def map_sites_to_response(
462463
"status": _map_to_notification_status(notification_type),
463464
"resource": {
464465
"type": XSI_TYPE_END_DEVICE_LIST,
466+
"pollRate": poll_rate_seconds,
465467
"all_": len(sites),
466468
"results": len(sites),
467469
"EndDevice": [EndDeviceMapper.map_to_response(scope, s, disable_registration, 0) for s in sites],

tests/integration/admin/test_notification.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from datetime import datetime
23
from decimal import Decimal
34
from http import HTTPStatus
@@ -26,6 +27,7 @@
2627
from sqlalchemy import delete, insert, select
2728

2829
from envoy.notification.task.transmit import HEADER_NOTIFICATION_ID
30+
from envoy.server.model.server import RuntimeServerConfig
2931
from envoy.server.model.subscription import Subscription, SubscriptionResource
3032
from envoy.server.model.tariff import PRICE_DECIMAL_POWER
3133

@@ -610,6 +612,55 @@ async def test_delete_site_with_active_subscription(
610612
), "Expected unique notification ids for each request"
611613

612614

615+
@pytest.mark.anyio
616+
async def test_update_server_config_edev_list_notification(
617+
admin_client_auth: AsyncClient, notifications_enabled: MockedAsyncClient, pg_base_config
618+
):
619+
"""Tests that updating server config generates subscription notifications for EndDeviceList"""
620+
621+
subscription1_uri = "http://my.example:542/uri"
622+
623+
async with generate_async_session(pg_base_config) as session:
624+
# Clear any other subs first
625+
await session.execute(delete(Subscription))
626+
627+
# Will pickup site notifications
628+
await session.execute(
629+
insert(Subscription).values(
630+
aggregator_id=1,
631+
changed_time=datetime.now(),
632+
resource_type=SubscriptionResource.SITE,
633+
resource_id=None,
634+
scoped_site_id=None,
635+
notification_uri=subscription1_uri,
636+
entity_limit=10,
637+
)
638+
)
639+
await session.commit()
640+
641+
# Update edev list config
642+
edev_list_poll_rate = 131009115
643+
config_request = generate_class_instance(
644+
RuntimeServerConfigRequest, optional_is_none=True, edevl_pollrate_seconds=edev_list_poll_rate
645+
)
646+
resp = await admin_client_auth.post(ServerConfigRuntimeUri, content=config_request.model_dump_json())
647+
assert resp.status_code == HTTPStatus.NO_CONTENT
648+
649+
# Give the notifications a chance to propagate
650+
assert await notifications_enabled.wait_for_n_requests(n=1, timeout_seconds=30)
651+
652+
# Sub 1 got one notification, sub 2 got the other
653+
assert notifications_enabled.call_count_by_method[HTTPMethod.GET] == 0
654+
assert notifications_enabled.call_count_by_method[HTTPMethod.POST] == 1
655+
assert notifications_enabled.call_count_by_method_uri[(HTTPMethod.POST, subscription1_uri)] == 1
656+
657+
assert all([HEADER_NOTIFICATION_ID in r.headers for r in notifications_enabled.logged_requests])
658+
assert len(set([r.headers[HEADER_NOTIFICATION_ID] for r in notifications_enabled.logged_requests])) == len(
659+
notifications_enabled.logged_requests
660+
), "Expected unique notification ids for each request"
661+
assert f'pollRate="{edev_list_poll_rate}"' in notifications_enabled.logged_requests[0].content
662+
663+
613664
@pytest.mark.anyio
614665
async def test_update_server_config_fsa_notification(
615666
admin_client_auth: AsyncClient, notifications_enabled: MockedAsyncClient, pg_base_config
@@ -671,6 +722,68 @@ async def test_update_server_config_fsa_notification(
671722
), "Expected unique notification ids for each request"
672723

673724

725+
@pytest.mark.parametrize("none_fsa_value", [True, False])
726+
@pytest.mark.anyio
727+
async def test_update_server_config_fsa_notification_no_change(
728+
admin_client_auth: AsyncClient, notifications_enabled: MockedAsyncClient, pg_base_config, none_fsa_value: bool
729+
):
730+
"""Tests that updating server config (with no changed value for FSA) generates 0 notifications"""
731+
732+
subscription1_uri = "http://my.example:542/uri"
733+
fsa_poll_rate = 123
734+
735+
async with generate_async_session(pg_base_config) as session:
736+
# Clear any other subs first
737+
await session.execute(delete(Subscription))
738+
739+
# Force the server FSA pollrate config to a known value
740+
await session.execute(delete(RuntimeServerConfig))
741+
await session.execute(
742+
insert(RuntimeServerConfig).values(
743+
changed_time=datetime.now(),
744+
dcap_pollrate_seconds=None,
745+
edevl_pollrate_seconds=None,
746+
fsal_pollrate_seconds=fsa_poll_rate,
747+
derpl_pollrate_seconds=None,
748+
derl_pollrate_seconds=None,
749+
mup_postrate_seconds=None,
750+
site_control_pow10_encoding=None,
751+
disable_edev_registration=False,
752+
)
753+
)
754+
755+
# Will pickup FSA notifications for site 2
756+
await session.execute(
757+
insert(Subscription).values(
758+
aggregator_id=1,
759+
changed_time=datetime.now(),
760+
resource_type=SubscriptionResource.FUNCTION_SET_ASSIGNMENTS,
761+
resource_id=None,
762+
scoped_site_id=2,
763+
notification_uri=subscription1_uri,
764+
entity_limit=10,
765+
)
766+
)
767+
768+
await session.commit()
769+
770+
# Update config with the subscriptions now live
771+
config_request = generate_class_instance(
772+
RuntimeServerConfigRequest,
773+
optional_is_none=True,
774+
disable_edev_registration=True,
775+
fsal_pollrate_seconds=None if none_fsa_value else fsa_poll_rate,
776+
)
777+
resp = await admin_client_auth.post(ServerConfigRuntimeUri, content=config_request.model_dump_json())
778+
assert resp.status_code == HTTPStatus.NO_CONTENT
779+
780+
# Give any notifications a chance to propagate
781+
await asyncio.sleep(3)
782+
783+
# No notifications should've been generated as we aren't actually changing any values associated with the FSA
784+
assert len(notifications_enabled.logged_requests) == 0
785+
786+
674787
@pytest.mark.anyio
675788
async def test_update_site_default_config_notification(
676789
admin_client_auth: AsyncClient, notifications_enabled: MockedAsyncClient, pg_base_config

tests/unit/admin/crud/test_aggregator.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
import psycopg
44
import pytest
5-
from assertical.fixtures.postgres import generate_async_session
65
import sqlalchemy as sa
6+
from assertical.fixtures.postgres import generate_async_session
77
from sqlalchemy.exc import IntegrityError
88

9+
from envoy.admin import crud
910
from envoy.admin.crud.certificate import select_all_certificates_for_aggregator
1011
from envoy.server import model
11-
from envoy.admin import crud
1212
from envoy.server.crud.aggregator import select_aggregator
1313

1414

@@ -27,8 +27,10 @@ async def test_count_all_aggregators_empty(pg_empty_config: psycopg.Connection)
2727
@pytest.mark.parametrize(
2828
"start, limit, expected_aggregator_ids, expected_domain_ids",
2929
[
30+
(None, None, [1, 2, 3], [1, 2, 3, 4]),
3031
(0, 500, [1, 2, 3], [1, 2, 3, 4]),
3132
(0, 1, [1], [1, 4]),
33+
(None, 1, [1], [1, 4]),
3234
(1, 1, [2], [2]),
3335
(2, 1, [3], [3]),
3436
(3, 1, [], []),

tests/unit/server/mapper/sep2/test_pub_sub.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ def test_NotificationMapper_map_sites_to_response(notification_type: Notificatio
532532
DeviceOrAggregatorRequestScope, seed=1001, href_prefix="/custom/prefix"
533533
)
534534

535-
notification = NotificationMapper.map_sites_to_response([site1, site2], sub, scope, notification_type, False)
535+
notification = NotificationMapper.map_sites_to_response([site1, site2], sub, scope, notification_type, False, 9876)
536536
assert isinstance(notification, Notification)
537537
assert notification.subscribedResource.startswith("/custom/prefix")
538538
assert EndDeviceListUri in notification.subscribedResource
@@ -544,6 +544,7 @@ def test_NotificationMapper_map_sites_to_response(notification_type: Notificatio
544544
assert notification.status == NotificationStatus.DEFAULT
545545

546546
assert notification.resource.type == XSI_TYPE_END_DEVICE_LIST
547+
assert notification.resource.pollRate == 9876
547548
assert_list_type(EndDeviceResponse, notification.resource.EndDevice, count=2)
548549
assert_entity_hrefs_contain_entity_id_and_prefix(
549550
[e.href for e in notification.resource.EndDevice], [site1.site_id, site2.site_id], scope.href_prefix

0 commit comments

Comments
 (0)