|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import logging |
| 4 | +from collections import Counter |
| 5 | +from typing import TYPE_CHECKING |
| 6 | + |
| 7 | +from steam_library_manager.services.chart_data import ChartSlice |
| 8 | +from steam_library_manager.core.game import is_real_game |
| 9 | + |
| 10 | +if TYPE_CHECKING: |
| 11 | + from steam_library_manager.core.game import Game |
| 12 | + from steam_library_manager.core.database import Database |
| 13 | + |
| 14 | +__all__ = ["StatisticsService"] |
| 15 | + |
| 16 | +logger = logging.getLogger("steamlibmgr.statistics") |
| 17 | + |
| 18 | + |
| 19 | +class StatisticsService: |
| 20 | + """Aggregiert Game-Daten fuer Charts.""" |
| 21 | + |
| 22 | + def __init__(self, games: list[Game], database: Database | None = None): |
| 23 | + self._games = games |
| 24 | + self._real = [g for g in games if is_real_game(g)] |
| 25 | + self._db = database |
| 26 | + |
| 27 | + def overview(self) -> dict[str, int | float]: |
| 28 | + total = len(self._real) |
| 29 | + inst = sum(1 for g in self._real if g.installed) |
| 30 | + pt = sum(g.playtime_minutes for g in self._real) |
| 31 | + never = sum(1 for g in self._real if g.playtime_minutes == 0) |
| 32 | + perfect = sum(1 for g in self._real if g.achievement_perfect) |
| 33 | + return { |
| 34 | + "total": total, |
| 35 | + "installed": inst, |
| 36 | + "not_installed": total - inst, |
| 37 | + "playtime_hours": round(pt / 60, 1), |
| 38 | + "never_played": never, |
| 39 | + "never_played_pct": round(never / total * 100, 1) if total else 0, |
| 40 | + "perfect_games": perfect, |
| 41 | + "avg_playtime_hours": round(pt / 60 / total, 1) if total else 0, |
| 42 | + } |
| 43 | + |
| 44 | + def genres_by_count(self, top_n: int = 15) -> list[ChartSlice]: |
| 45 | + return self._counter_to_slices(self._count_list_field("genres"), top_n) |
| 46 | + |
| 47 | + def genres_by_playtime(self, top_n: int = 15) -> list[ChartSlice]: |
| 48 | + cnt: Counter = Counter() |
| 49 | + for g in self._real: |
| 50 | + for genre in g.genres: |
| 51 | + cnt[genre] += g.playtime_minutes |
| 52 | + for k in cnt: |
| 53 | + cnt[k] = round(cnt[k] / 60, 1) |
| 54 | + return self._counter_to_slices(cnt, top_n) |
| 55 | + |
| 56 | + def platforms(self) -> list[ChartSlice]: |
| 57 | + return self._counter_to_slices(self._count_list_field("platforms")) |
| 58 | + |
| 59 | + def deck_status(self) -> list[ChartSlice]: |
| 60 | + cnt = Counter(g.steam_deck_status or "unknown" for g in self._real) |
| 61 | + return self._counter_to_slices(cnt) |
| 62 | + |
| 63 | + def protondb_tiers(self) -> list[ChartSlice]: |
| 64 | + cnt = Counter(g.proton_db_rating or "unknown" for g in self._real) |
| 65 | + return self._counter_to_slices(cnt) |
| 66 | + |
| 67 | + def achievement_buckets(self) -> list[ChartSlice]: |
| 68 | + buckets = {"perfect": 0, "almost": 0, "progress": 0, "started": 0, "zero": 0, "none": 0} |
| 69 | + for g in self._real: |
| 70 | + if g.achievement_total == 0: |
| 71 | + buckets["none"] += 1 |
| 72 | + elif g.achievement_perfect: |
| 73 | + buckets["perfect"] += 1 |
| 74 | + elif g.achievement_percentage >= 75: |
| 75 | + buckets["almost"] += 1 |
| 76 | + elif g.achievement_percentage >= 25: |
| 77 | + buckets["progress"] += 1 |
| 78 | + elif g.achievement_percentage > 0: |
| 79 | + buckets["started"] += 1 |
| 80 | + else: |
| 81 | + buckets["zero"] += 1 |
| 82 | + return [ChartSlice(label=k, value=v) for k, v in buckets.items() if v > 0] |
| 83 | + |
| 84 | + def perfect_games(self) -> list[Game]: |
| 85 | + return sorted( |
| 86 | + [g for g in self._real if g.achievement_perfect], |
| 87 | + key=lambda g: g.name.lower(), |
| 88 | + ) |
| 89 | + |
| 90 | + def almost_done(self, threshold: float = 80.0) -> list[Game]: |
| 91 | + return sorted( |
| 92 | + [ |
| 93 | + g |
| 94 | + for g in self._real |
| 95 | + if g.achievement_total > 0 and g.achievement_percentage >= threshold and not g.achievement_perfect |
| 96 | + ], |
| 97 | + key=lambda g: g.achievement_percentage, |
| 98 | + reverse=True, |
| 99 | + ) |
| 100 | + |
| 101 | + def rare_achievements(self, threshold: float = 10.0) -> dict[str, int]: |
| 102 | + if not self._db: |
| 103 | + return {"rare_count": 0, "ultra_rare_count": 0, "total_unlocked": 0} |
| 104 | + try: |
| 105 | + cursor = self._db.conn.execute( |
| 106 | + "SELECT COUNT(*) FROM achievements WHERE is_unlocked = 1 AND rarity_percentage < ?", |
| 107 | + (threshold,), |
| 108 | + ) |
| 109 | + rare = cursor.fetchone()[0] or 0 |
| 110 | + cursor = self._db.conn.execute( |
| 111 | + "SELECT COUNT(*) FROM achievements WHERE is_unlocked = 1 AND rarity_percentage < 1.0" |
| 112 | + ) |
| 113 | + ultra = cursor.fetchone()[0] or 0 |
| 114 | + cursor = self._db.conn.execute("SELECT COUNT(*) FROM achievements WHERE is_unlocked = 1") |
| 115 | + total = cursor.fetchone()[0] or 0 |
| 116 | + return {"rare_count": rare, "ultra_rare_count": ultra, "total_unlocked": total} |
| 117 | + except Exception: |
| 118 | + return {"rare_count": 0, "ultra_rare_count": 0, "total_unlocked": 0} |
| 119 | + |
| 120 | + def achievement_rarity_buckets(self) -> list[ChartSlice]: |
| 121 | + if not self._db: |
| 122 | + return [] |
| 123 | + try: |
| 124 | + buckets = {"ultra_rare": 0, "rare": 0, "uncommon": 0, "common": 0} |
| 125 | + cursor = self._db.conn.execute("SELECT rarity_percentage FROM achievements WHERE is_unlocked = 1") |
| 126 | + for row in cursor: |
| 127 | + pct = row[0] or 100.0 |
| 128 | + if pct < 1.0: |
| 129 | + buckets["ultra_rare"] += 1 |
| 130 | + elif pct < 10.0: |
| 131 | + buckets["rare"] += 1 |
| 132 | + elif pct < 50.0: |
| 133 | + buckets["uncommon"] += 1 |
| 134 | + else: |
| 135 | + buckets["common"] += 1 |
| 136 | + return [ChartSlice(label=k, value=v) for k, v in buckets.items() if v > 0] |
| 137 | + except Exception: |
| 138 | + return [] |
| 139 | + |
| 140 | + def top_played(self, n: int = 10) -> list[ChartSlice]: |
| 141 | + top = sorted(self._real, key=lambda g: g.playtime_minutes, reverse=True)[:n] |
| 142 | + return [ChartSlice(label=g.name, value=round(g.playtime_minutes / 60, 1)) for g in top] |
| 143 | + |
| 144 | + def shame_pile(self) -> dict[str, int]: |
| 145 | + never = [g for g in self._real if g.playtime_minutes == 0] |
| 146 | + return { |
| 147 | + "total": len(never), |
| 148 | + "installed": sum(1 for g in never if g.installed), |
| 149 | + "not_installed": sum(1 for g in never if not g.installed), |
| 150 | + } |
| 151 | + |
| 152 | + def playtime_buckets(self) -> list[ChartSlice]: |
| 153 | + buckets = {"0h": 0, "lt_1h": 0, "1_5h": 0, "5_20h": 0, "20_100h": 0, "100h_plus": 0} |
| 154 | + for g in self._real: |
| 155 | + hrs = g.playtime_minutes / 60 |
| 156 | + if hrs == 0: |
| 157 | + buckets["0h"] += 1 |
| 158 | + elif hrs < 1: |
| 159 | + buckets["lt_1h"] += 1 |
| 160 | + elif hrs < 5: |
| 161 | + buckets["1_5h"] += 1 |
| 162 | + elif hrs < 20: |
| 163 | + buckets["5_20h"] += 1 |
| 164 | + elif hrs < 100: |
| 165 | + buckets["20_100h"] += 1 |
| 166 | + else: |
| 167 | + buckets["100h_plus"] += 1 |
| 168 | + return [ChartSlice(label=k, value=v) for k, v in buckets.items() if v > 0] |
| 169 | + |
| 170 | + def pegi_distribution(self) -> list[ChartSlice]: |
| 171 | + cnt = Counter(g.pegi_rating or "none" for g in self._real) |
| 172 | + return self._counter_to_slices(cnt) |
| 173 | + |
| 174 | + def review_buckets(self) -> list[ChartSlice]: |
| 175 | + buckets = {"op_95": 0, "vp_80": 0, "pos_70": 0, "mixed_40": 0, "neg_0": 0, "no_reviews": 0} |
| 176 | + for g in self._real: |
| 177 | + pct = g.review_percentage |
| 178 | + if g.review_count == 0: |
| 179 | + buckets["no_reviews"] += 1 |
| 180 | + elif pct >= 95: |
| 181 | + buckets["op_95"] += 1 |
| 182 | + elif pct >= 80: |
| 183 | + buckets["vp_80"] += 1 |
| 184 | + elif pct >= 70: |
| 185 | + buckets["pos_70"] += 1 |
| 186 | + elif pct >= 40: |
| 187 | + buckets["mixed_40"] += 1 |
| 188 | + else: |
| 189 | + buckets["neg_0"] += 1 |
| 190 | + return [ChartSlice(label=k, value=v) for k, v in buckets.items() if v > 0] |
| 191 | + |
| 192 | + def top_developers(self, n: int = 10) -> list[ChartSlice]: |
| 193 | + cnt: Counter = Counter() |
| 194 | + for g in self._real: |
| 195 | + if g.developer: |
| 196 | + for dev in g.developer.split(", "): |
| 197 | + dev = dev.strip() |
| 198 | + if dev: |
| 199 | + cnt[dev] += 1 |
| 200 | + return self._counter_to_slices(cnt, n) |
| 201 | + |
| 202 | + def hltb_buckets(self) -> list[ChartSlice]: |
| 203 | + buckets = { |
| 204 | + "lt_5h": 0, |
| 205 | + "5_10h": 0, |
| 206 | + "10_20h": 0, |
| 207 | + "20_40h": 0, |
| 208 | + "40_100h": 0, |
| 209 | + "100h_plus": 0, |
| 210 | + "no_data": 0, |
| 211 | + } |
| 212 | + for g in self._real: |
| 213 | + hrs = g.hltb_main_story |
| 214 | + if hrs <= 0: |
| 215 | + buckets["no_data"] += 1 |
| 216 | + elif hrs < 5: |
| 217 | + buckets["lt_5h"] += 1 |
| 218 | + elif hrs < 10: |
| 219 | + buckets["5_10h"] += 1 |
| 220 | + elif hrs < 20: |
| 221 | + buckets["10_20h"] += 1 |
| 222 | + elif hrs < 40: |
| 223 | + buckets["20_40h"] += 1 |
| 224 | + elif hrs < 100: |
| 225 | + buckets["40_100h"] += 1 |
| 226 | + else: |
| 227 | + buckets["100h_plus"] += 1 |
| 228 | + return [ChartSlice(label=k, value=v) for k, v in buckets.items() if v > 0] |
| 229 | + |
| 230 | + def _count_list_field(self, field: str) -> Counter: |
| 231 | + cnt: Counter = Counter() |
| 232 | + for g in self._real: |
| 233 | + vals = getattr(g, field, []) |
| 234 | + if isinstance(vals, list): |
| 235 | + for v in vals: |
| 236 | + cnt[str(v).capitalize()] += 1 |
| 237 | + return cnt |
| 238 | + |
| 239 | + @staticmethod |
| 240 | + def _counter_to_slices(cnt: Counter, top_n: int = 0) -> list[ChartSlice]: |
| 241 | + items = cnt.most_common(top_n) if top_n > 0 else cnt.most_common() |
| 242 | + return [ChartSlice(label=lbl, value=val) for lbl, val in items] |
0 commit comments