diff --git a/usr/lib/hypnotix/hypnotix.py b/usr/lib/hypnotix/hypnotix.py
index 8f1c58a7..17d5c78a 100755
--- a/usr/lib/hypnotix/hypnotix.py
+++ b/usr/lib/hypnotix/hypnotix.py
@@ -8,8 +8,16 @@
import traceback
import warnings
import subprocess
+import tempfile
+import requests
+import gzip
+import re
+import base64
+import pickle
+import xml.etree.ElementTree as xmlET
from functools import partial
from pathlib import Path
+from datetime import datetime, date, timedelta, timezone
# Force X11 on a Wayland session
if "WAYLAND_DISPLAY" in os.environ:
@@ -452,6 +460,7 @@ def add_badge(self, word, box, added_words):
print(e)
def show_groups(self, widget, content_type):
+ self.load_epg(self.active_provider.epg)
self.content_type = content_type
self.navigate_to("categories_page")
for child in self.categories_flowbox.get_children():
@@ -514,6 +523,7 @@ def on_category_button_clicked(self, widget, group):
self.show_vod(self.active_provider.series)
def show_favorites(self, widget=None):
+ self.load_epg(self.settings.get_string("favorites-epg"))
self.content_type = TV_GROUP
channels = []
for line in self.favorite_data:
@@ -542,6 +552,8 @@ def show_channels(self, channels, favorites=False):
self.download_channel_logos(logos_to_refresh)
else:
self.sidebar.hide()
+ self.epg_counter = {"channel": "", "idx": -1}
+ self.epg_timestamp = 0
def show_vod(self, items):
logos_to_refresh = []
@@ -852,6 +864,8 @@ def on_favorite_button_toggled(self, widget):
if widget.get_active() and data not in self.favorite_data:
print (f"Adding {name} to favorites")
self.favorite_data.append(data)
+ current_epg = self.active_provider.epg
+ self.settings.set_string("favorites-epg", (self.settings.get_string("favorites-epg").replace(current_epg, "") + " " + current_epg))
elif widget.get_active() == False and data in self.favorite_data:
print (f"Removing {name} from favorites")
self.favorite_data.remove(data)
@@ -875,6 +889,8 @@ def on_next_channel(self):
@async_function
def play_async(self, channel):
if self.mpv is not None:
+ self.epg_counter["idx"] = -1
+ self.mpv.command("show-text", "", 1)
self.mpv.stop()
self.mpv.pause = False
print("CHANNEL: '%s' (%s)" % (channel.name, channel.url))
@@ -1273,6 +1289,8 @@ def set_provider_type(self, type_id):
visible_widgets.append(self.path_entry)
visible_widgets.append(self.path_label)
visible_widgets.append(self.browse_button)
+ visible_widgets.append(self.epg_entry)
+ visible_widgets.append(self.epg_label)
elif type_id == PROVIDER_TYPE_XTREAM:
visible_widgets.append(self.url_entry)
visible_widgets.append(self.url_label)
@@ -1448,7 +1466,42 @@ def close(w, res):
def on_menu_quit(self, widget):
self.application.quit()
-
+
+ @async_function
+ def load_epg(self, epg_urls):
+ self.status_label.set_text("Loading EPG...")
+ self.status_label.show()
+ self.epg = None
+ def get_cached_epg_path(urls):
+ cached_epg_name = base64.urlsafe_b64encode((date.today().isoformat() + urls.replace(" ","")).encode()).decode()
+ return os.path.join(tempfile.gettempdir(), cached_epg_name)
+ cached_epg_path = get_cached_epg_path(epg_urls)
+ if os.path.exists(cached_epg_path):
+ with gzip.open(cached_epg_path, 'rb') as f:
+ self.epg = pickle.load(f)
+ elif (epg_urls != ""):
+ urls = ""
+ for e in epg_urls.split():
+ try:
+ response = requests.get(e)
+ with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
+ tmp_file.write(response.content)
+ temp_file_path = tmp_file.name
+ with gzip.open(temp_file_path, 'rb') as f:
+ ungzip = f.read().decode('utf-8')
+ epg = xmlET.fromstring(ungzip)
+ if self.epg is None:
+ self.epg = epg
+ else:
+ for item in epg:
+ self.epg.append(item)
+ urls += e
+ except:
+ pass
+ with gzip.open(get_cached_epg_path(urls), 'wb') as f:
+ pickle.dump(self.epg, f)
+ self.status_label.hide()
+
def on_key_press_event(self, widget, event):
if isinstance(widget.get_focus(), Gtk.Entry):
return False
@@ -1460,8 +1513,46 @@ def on_key_press_event(self, widget, event):
# Bool of Control or Shift modifier states
ctrl = modifier == Gdk.ModifierType.CONTROL_MASK
shift = modifier == Gdk.ModifierType.SHIFT_MASK
-
- if ctrl and event.keyval == Gdk.KEY_r:
+
+ epg_duration = 6 # seconds
+
+ def chan_match(chan1, chan2):
+ # discard digits at the beginning
+ chan1 = re.sub(r'^\d+', '', chan1)
+ chan2 = re.sub(r'^\d+', '', chan2)
+ # discard useless words
+ regex = r"\b(4K|HD)\b"
+ chan1 = re.sub(regex, "", chan1, flags=re.IGNORECASE)
+ chan2 = re.sub(regex, "", chan2, flags=re.IGNORECASE)
+ # normalize
+ chan1 = chan1.lower().replace(" ","")
+ chan1 = ''.join(filter(str.isalnum, chan1))
+ chan2 = chan2.lower().replace(" ","")
+ chan2 = ''.join(filter(str.isalnum, chan2))
+ return (chan1 in chan2 or chan2 in chan1)
+
+ if event.keyval == Gdk.KEY_g and not isinstance(widget.get_focus(), Gtk.Entry):
+ dateFormat = "%Y%m%d%H%M%S"
+ timeFormat = "%H:%M"
+ hoursOffset = 0 - int(datetime.now().astimezone().utcoffset().total_seconds() / 3600)
+ targetDatetime = (datetime.now() + timedelta(hours=hoursOffset)).timestamp()
+ if self.active_channel.name != self.epg_counter["channel"]:
+ self.epg_counter = {"channel": self.active_channel.name, "idx": -1 }
+ try:
+ channelEPG = [p for p in self.epg.findall("programme") if chan_match(p.attrib["channel"], self.active_channel.name)]
+ onair = [p for p in channelEPG if (tstart := datetime.strptime(p.attrib["start"].split()[0], dateFormat).timestamp()) <= targetDatetime and (tstop := datetime.strptime(p.attrib["stop"].split()[0], dateFormat).timestamp()) >= targetDatetime and (tstop - tstart) < (3600 * 5)]
+ osd_counter = ""
+ if len(onair) > 1:
+ self.epg_counter["idx"] = (self.epg_counter["idx"] + 1) % len(onair)
+ osd_counter = " [" + str(self.epg_counter["idx"] + 1) + "/" + str(len(onair)) + "]"
+ onair = onair[self.epg_counter["idx"]]
+ onairTime = (datetime.strptime(onair.attrib["start"].split()[0], dateFormat) + timedelta(hours=(0 - hoursOffset))).strftime(timeFormat) + " - " + (datetime.strptime(onair.attrib["stop"].split()[0], dateFormat) + timedelta(hours=(0 - hoursOffset))).strftime(timeFormat)
+ onairText = onair.attrib["channel"] + osd_counter + "\n" + onair.find("title").text + "\n" + onairTime
+ except:
+ onairText = "(no info)"
+ self.mpv.command("show-text", onairText, (epg_duration * 1000))
+ self.epg_timestamp = datetime.now().timestamp()
+ elif ctrl and event.keyval == Gdk.KEY_r:
self.reload(page=None, refresh=True)
elif ctrl and event.keyval == Gdk.KEY_f:
if self.search_button.get_active():
@@ -1476,7 +1567,11 @@ def on_key_press_event(self, widget, event):
elif event.keyval == Gdk.KEY_F7:
self.borderless_mode()
elif event.keyval == Gdk.KEY_Escape:
- self.normal_mode()
+ if ((datetime.now().timestamp() - self.epg_timestamp) <= epg_duration):
+ self.mpv.command("show-text", "", 1)
+ self.epg_timestamp = 0
+ else:
+ self.normal_mode()
elif event.keyval == Gdk.KEY_BackSpace and not ctrl:
self.normal_mode()
self.on_go_back_button()
diff --git a/usr/share/glib-2.0/schemas/org.x.hypnotix.gschema.xml b/usr/share/glib-2.0/schemas/org.x.hypnotix.gschema.xml
index 0ffd7b3d..5e67bef6 100644
--- a/usr/share/glib-2.0/schemas/org.x.hypnotix.gschema.xml
+++ b/usr/share/glib-2.0/schemas/org.x.hypnotix.gschema.xml
@@ -21,6 +21,11 @@
Provider selected by default
+
+ ""
+ EPG urls for favorites
+
+
['Free-TV:::url:::https://raw.githubusercontent.com/Free-TV/IPTV/master/playlist.m3u8:::::::::']
Format: name:::type:::url(or path):::username:::password:::epg