Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ tasks/
instance/

# data/ is a Python package — only exclude non-code files
data/*.json
data/*.csv
data/*.db

Expand Down
198 changes: 103 additions & 95 deletions utils/wefax_stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,44 @@
Loads and caches station data from data/wefax_stations.json. Provides
lookup by callsign and current-broadcast filtering based on UTC time.
"""

from __future__ import annotations

import json
from datetime import datetime, timezone
from pathlib import Path


from __future__ import annotations

import json
import logging
from datetime import datetime, timezone
from pathlib import Path

log = logging.getLogger(__name__)

_stations_cache: list[dict] | None = None
_stations_by_callsign: dict[str, dict] = {}
_VALID_FREQUENCY_REFERENCES = {'auto', 'carrier', 'dial'}
_VALID_FREQUENCY_REFERENCES = {"auto", "carrier", "dial"}
WEFAX_USB_ALIGNMENT_OFFSET_KHZ = 1.9

_STATIONS_PATH = Path(__file__).resolve().parent.parent / 'data' / 'wefax_stations.json'


def load_stations() -> list[dict]:
"""Load all WeFax stations from JSON, caching on first call."""
global _stations_cache, _stations_by_callsign

if _stations_cache is not None:
return _stations_cache

with open(_STATIONS_PATH) as f:
data = json.load(f)

_stations_cache = data.get('stations', [])
_stations_by_callsign = {s['callsign']: s for s in _stations_cache}
return _stations_cache


_STATIONS_PATH = Path(__file__).resolve().parent.parent / "data" / "wefax_stations.json"


def load_stations() -> list[dict]:
"""Load all WeFax stations from JSON, caching on first call."""
global _stations_cache, _stations_by_callsign

if _stations_cache is not None:
return _stations_cache

if not _STATIONS_PATH.exists():
log.warning("wefax_stations.json not found at %s", _STATIONS_PATH)
_stations_cache = []
return _stations_cache

with open(_STATIONS_PATH) as f:
data = json.load(f)

_stations_cache = data.get("stations", [])
_stations_by_callsign = {s["callsign"]: s for s in _stations_cache}
return _stations_cache


def get_station(callsign: str) -> dict | None:
"""Get a single station by callsign."""
load_stations()
Expand All @@ -41,38 +49,38 @@ def get_station(callsign: str) -> dict | None:

def _normalize_frequency_reference(value: str | None) -> str:
"""Normalize and validate frequency reference token."""
reference = str(value or 'auto').strip().lower()
reference = str(value or "auto").strip().lower()
if reference not in _VALID_FREQUENCY_REFERENCES:
choices = ', '.join(sorted(_VALID_FREQUENCY_REFERENCES))
raise ValueError(f'frequency_reference must be one of: {choices}')
choices = ", ".join(sorted(_VALID_FREQUENCY_REFERENCES))
raise ValueError(f"frequency_reference must be one of: {choices}")
return reference


def _station_frequency_reference(station: dict, listed_frequency_khz: float) -> str:
"""Infer whether a station frequency entry is carrier or already USB dial."""
for entry in station.get('frequencies', []):
for entry in station.get("frequencies", []):
try:
entry_khz = float(entry.get('khz'))
entry_khz = float(entry.get("khz"))
except (TypeError, ValueError):
continue
if abs(entry_khz - listed_frequency_khz) > 0.001:
continue
entry_ref = str(entry.get('reference', '')).strip().lower()
if entry_ref in ('carrier', 'dial'):
entry_ref = str(entry.get("reference", "")).strip().lower()
if entry_ref in ("carrier", "dial"):
return entry_ref

station_ref = str(station.get('frequency_reference', '')).strip().lower()
if station_ref in ('carrier', 'dial'):
station_ref = str(station.get("frequency_reference", "")).strip().lower()
if station_ref in ("carrier", "dial"):
return station_ref

# Most published marine WeFax channel lists are carrier frequencies.
return 'carrier'
return "carrier"


def resolve_tuning_frequency_khz(
listed_frequency_khz: float,
station_callsign: str = '',
frequency_reference: str = 'auto',
station_callsign: str = "",
frequency_reference: str = "auto",
) -> tuple[float, str, bool]:
"""Resolve listed frequency to the actual USB dial frequency.

Expand All @@ -86,75 +94,75 @@ def resolve_tuning_frequency_khz(
"""
listed = float(listed_frequency_khz)
if listed <= 0:
raise ValueError('frequency_khz must be greater than zero')
raise ValueError("frequency_khz must be greater than zero")

requested_ref = _normalize_frequency_reference(frequency_reference)
resolved_ref = requested_ref

if requested_ref == 'auto':
if requested_ref == "auto":
station = get_station(station_callsign) if station_callsign else None
if station:
resolved_ref = _station_frequency_reference(station, listed)
else:
# For ad-hoc frequencies (no station metadata), treat input as dial.
resolved_ref = 'dial'
resolved_ref = "dial"

if resolved_ref == 'carrier':
if resolved_ref == "carrier":
tuned = round(listed - WEFAX_USB_ALIGNMENT_OFFSET_KHZ, 3)
if tuned <= 0:
raise ValueError('frequency_khz too low after USB alignment offset')
raise ValueError("frequency_khz too low after USB alignment offset")
return tuned, resolved_ref, True

return listed, resolved_ref, False
def get_current_broadcasts(callsign: str) -> list[dict]:
"""Return schedule entries closest to the current UTC time.
Returns up to 3 entries: the most recent past broadcast and the
next two upcoming ones, annotated with ``minutes_until`` or
``minutes_ago`` relative to now.
"""
station = get_station(callsign)
if not station:
return []
now = datetime.now(timezone.utc)
current_minutes = now.hour * 60 + now.minute
schedule = station.get('schedule', [])
if not schedule:
return []
# Convert schedule times to minutes-since-midnight for comparison
entries: list[tuple[int, dict]] = []
for entry in schedule:
parts = entry['utc'].split(':')
mins = int(parts[0]) * 60 + int(parts[1])
entries.append((mins, entry))
entries.sort(key=lambda x: x[0])
# Find closest entries relative to now
results = []
for mins, entry in entries:
diff = mins - current_minutes
# Wrap around midnight
if diff < -720:
diff += 1440
elif diff > 720:
diff -= 1440
annotated = dict(entry)
if diff >= 0:
annotated['minutes_until'] = diff
else:
annotated['minutes_ago'] = abs(diff)
annotated['_sort_key'] = abs(diff)
results.append(annotated)
results.sort(key=lambda x: x['_sort_key'])
# Return 3 nearest entries, clean up sort key
for r in results:
r.pop('_sort_key', None)
return results[:3]


def get_current_broadcasts(callsign: str) -> list[dict]:
"""Return schedule entries closest to the current UTC time.

Returns up to 3 entries: the most recent past broadcast and the
next two upcoming ones, annotated with ``minutes_until`` or
``minutes_ago`` relative to now.
"""
station = get_station(callsign)
if not station:
return []

now = datetime.now(timezone.utc)
current_minutes = now.hour * 60 + now.minute

schedule = station.get("schedule", [])
if not schedule:
return []

# Convert schedule times to minutes-since-midnight for comparison
entries: list[tuple[int, dict]] = []
for entry in schedule:
parts = entry["utc"].split(":")
mins = int(parts[0]) * 60 + int(parts[1])
entries.append((mins, entry))
entries.sort(key=lambda x: x[0])

# Find closest entries relative to now
results = []
for mins, entry in entries:
diff = mins - current_minutes
# Wrap around midnight
if diff < -720:
diff += 1440
elif diff > 720:
diff -= 1440

annotated = dict(entry)
if diff >= 0:
annotated["minutes_until"] = diff
else:
annotated["minutes_ago"] = abs(diff)
annotated["_sort_key"] = abs(diff)
results.append(annotated)

results.sort(key=lambda x: x["_sort_key"])

# Return 3 nearest entries, clean up sort key
for r in results:
r.pop("_sort_key", None)
return results[:3]
Loading