Skip to content

Commit 2623459

Browse files
authored
Merge pull request #84 from ogajduse/feat/enhance-sensor-class-structure
2 parents 97fd95c + d5c0f90 commit 2623459

File tree

2 files changed

+80
-43
lines changed

2 files changed

+80
-43
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ repos:
2121
rev: "v1.4.1"
2222
hooks:
2323
- id: mypy
24-
additional_dependencies: [homeassistant-stubs, voluptuous-stubs, types-python-dateutil]
24+
additional_dependencies: [homeassistant-stubs, voluptuous-stubs, types-python-dateutil, types-PyYAML]
2525
- repo: https://github.com/astral-sh/ruff-pre-commit
2626
# Ruff version.
2727
rev: v0.0.278

custom_components/feedparser/sensor.py

Lines changed: 79 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
"""Feedparser sensor."""
22
from __future__ import annotations
33

4+
import email.utils
45
import logging
5-
from datetime import datetime, timedelta
6+
from datetime import datetime, timedelta, timezone
67
from typing import TYPE_CHECKING
78

89
import feedparser # type: ignore[import]
@@ -32,16 +33,18 @@
3233
CONF_EXCLUSIONS = "exclusions"
3334
CONF_SHOW_TOPN = "show_topn"
3435

36+
DEFAULT_DATE_FORMAT = "%a, %b %d %I:%M %p"
3537
DEFAULT_SCAN_INTERVAL = timedelta(hours=1)
3638
DEFAULT_THUMBNAIL = "https://www.home-assistant.io/images/favicon-192x192-full.png"
39+
DEFAULT_TOPN = 9999
3740

3841
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
3942
{
4043
vol.Required(CONF_NAME): cv.string,
4144
vol.Required(CONF_FEED_URL): cv.string,
42-
vol.Required(CONF_DATE_FORMAT, default="%a, %b %d %I:%M %p"): cv.string,
45+
vol.Required(CONF_DATE_FORMAT, default=DEFAULT_DATE_FORMAT): cv.string,
4346
vol.Optional(CONF_LOCAL_TIME, default=False): cv.boolean,
44-
vol.Optional(CONF_SHOW_TOPN, default=9999): cv.positive_int,
47+
vol.Optional(CONF_SHOW_TOPN, default=DEFAULT_TOPN): cv.positive_int,
4548
vol.Optional(CONF_INCLUSIONS, default=[]): vol.All(cv.ensure_list, [cv.string]),
4649
vol.Optional(CONF_EXCLUSIONS, default=[]): vol.All(cv.ensure_list, [cv.string]),
4750
vol.Optional(CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL): cv.time_period,
@@ -114,45 +117,79 @@ def update(self: FeedParserSensor) -> None:
114117
if len(parsed_feed.entries) > self._show_topn
115118
else len(parsed_feed.entries)
116119
)
117-
self._entries = []
118-
119-
for entry in parsed_feed.entries[: self._attr_state]:
120-
entry_value = {}
121-
122-
for key, value in entry.items():
123-
if (
124-
(self._inclusions and key not in self._inclusions)
125-
or ("parsed" in key)
126-
or (key in self._exclusions)
127-
):
128-
continue
129-
if key in ["published", "updated", "created", "expired"]:
130-
time: datetime = parser.parse(value)
131-
if self._local_time:
132-
time = dt.as_local(time)
133-
entry_value[key] = time.strftime(self._date_format)
134-
else:
135-
entry_value[key] = value
136-
137-
if "image" in self._inclusions and "image" not in entry_value.keys():
138-
if "enclosures" in entry:
139-
images = [
140-
enc
141-
for enc in entry["enclosures"]
142-
if enc.type.startswith("image/")
143-
]
144-
else:
145-
images = []
146-
if images:
147-
entry_value["image"] = images[0][
148-
"href"
149-
] # pick the first image found
150-
else:
151-
entry_value[
152-
"image"
153-
] = DEFAULT_THUMBNAIL # use default image if no image found
154-
155-
self._entries.append(entry_value)
120+
self._entries.extend(self._generate_entries(parsed_feed))
121+
122+
def _generate_entries(
123+
self: FeedParserSensor, parsed_feed: FeedParserDict
124+
) -> list[dict[str, str]]:
125+
return [
126+
self._generate_sensor_entry(feed_entry)
127+
for feed_entry in parsed_feed.entries[
128+
: self.native_value # type: ignore[misc]
129+
]
130+
]
131+
132+
def _generate_sensor_entry(
133+
self: FeedParserSensor, feed_entry: FeedParserDict
134+
) -> dict[str, str]:
135+
sensor_entry = {}
136+
for key, value in feed_entry.items():
137+
if (
138+
(self._inclusions and key not in self._inclusions)
139+
or ("parsed" in key)
140+
or (key in self._exclusions)
141+
):
142+
continue
143+
if key in ["published", "updated", "created", "expired"]:
144+
parsed_date: datetime = self._parse_date(value)
145+
sensor_entry[key] = parsed_date.strftime(self._date_format)
146+
else:
147+
sensor_entry[key] = value
148+
149+
self._process_image(feed_entry, sensor_entry)
150+
151+
return sensor_entry
152+
153+
def _parse_date(self: FeedParserSensor, date: str) -> datetime:
154+
try:
155+
parsed_time: datetime = email.utils.parsedate_to_datetime(date)
156+
except ValueError:
157+
_LOGGER.warning(
158+
(
159+
"Unable to parse RFC-822 date from %s. "
160+
"This could be caused by incorrect pubDate format "
161+
"in the RSS feed or due to a leapp second"
162+
),
163+
date,
164+
)
165+
parsed_time = parser.parse(date)
166+
if not parsed_time.tzname():
167+
# replace tzinfo with UTC offset if tzinfo does not contain a TZ name
168+
parsed_time = parsed_time.replace(
169+
tzinfo=timezone(parsed_time.utcoffset()) # type: ignore[arg-type]
170+
)
171+
if self._local_time:
172+
parsed_time = dt.as_local(parsed_time)
173+
return parsed_time
174+
175+
def _process_image(
176+
self: FeedParserSensor, feed_entry: FeedParserDict, sensor_entry: dict[str, str]
177+
) -> None:
178+
if "image" in self._inclusions and "image" not in sensor_entry.keys():
179+
if "enclosures" in feed_entry:
180+
images = [
181+
enc
182+
for enc in feed_entry["enclosures"]
183+
if enc.type.startswith("image/")
184+
]
185+
else:
186+
images = []
187+
if images:
188+
sensor_entry["image"] = images[0]["href"] # pick the first image found
189+
else:
190+
sensor_entry[
191+
"image"
192+
] = DEFAULT_THUMBNAIL # use default image if no image found
156193

157194
@property
158195
def extra_state_attributes(self: FeedParserSensor) -> dict[str, list]:

0 commit comments

Comments
 (0)