|
1 | 1 | # -*- coding: utf-8 -*- |
| 2 | +from collections import deque |
2 | 3 | from datetime import datetime |
| 4 | +from typing import Tuple |
3 | 5 | from urllib.parse import parse_qsl, quote, quote_plus, unquote, urlencode, urlsplit |
4 | 6 |
|
5 | 7 | from plexapi import media, settings, utils |
@@ -61,63 +63,95 @@ def defaultAdvanced(self): |
61 | 63 |
|
62 | 64 |
|
63 | 65 | class SmartFilterMixin: |
64 | | - """ Mixing for Plex objects that can have smart filters. """ |
| 66 | + """ Mixin for Plex objects that can have smart filters. """ |
| 67 | + |
| 68 | + def _parseFilterGroups( |
| 69 | + self, feed: "deque[Tuple[str, str]]", returnOn: "set[str]|None" = None |
| 70 | + ) -> dict: |
| 71 | + """ Parse filter groups from input lines between push and pop. """ |
| 72 | + currentFiltersStack: list[dict] = [] |
| 73 | + operatorForStack = None |
| 74 | + if returnOn is None: |
| 75 | + returnOn = set("pop") |
| 76 | + else: |
| 77 | + returnOn.add("pop") |
| 78 | + allowedLogicalOperators = ["and", "or"] # first is the default |
| 79 | + |
| 80 | + while feed: |
| 81 | + key, value = feed.popleft() # consume the first item |
| 82 | + if key == "push": |
| 83 | + # recurse and add the result to the current stack |
| 84 | + currentFiltersStack.append( |
| 85 | + self._parseFilterGroups(feed, returnOn) |
| 86 | + ) |
| 87 | + elif key in returnOn: |
| 88 | + # stop iterating and return the current stack |
| 89 | + if not key == "pop": |
| 90 | + feed.appendleft((key, value)) # put the item back |
| 91 | + break |
| 92 | + |
| 93 | + elif key in allowedLogicalOperators: |
| 94 | + # set the operator |
| 95 | + if operatorForStack and not operatorForStack == key: |
| 96 | + raise ValueError( |
| 97 | + "cannot have different logical operators for the same" |
| 98 | + " filter group" |
| 99 | + ) |
| 100 | + operatorForStack = key |
| 101 | + |
| 102 | + else: |
| 103 | + # add the key value pair to the current filter |
| 104 | + currentFiltersStack.append({key: value}) |
| 105 | + |
| 106 | + if not operatorForStack and len(currentFiltersStack) > 1: |
| 107 | + # consider 'and' as the default operator |
| 108 | + operatorForStack = allowedLogicalOperators[0] |
| 109 | + |
| 110 | + if operatorForStack: |
| 111 | + return {operatorForStack: currentFiltersStack} |
| 112 | + return currentFiltersStack.pop() |
| 113 | + |
| 114 | + def _parseQueryFeed(self, feed: "deque[Tuple[str, str]]") -> dict: |
| 115 | + """ Parse the query string into a dict. """ |
| 116 | + filtersDict = {} |
| 117 | + special_keys = {"type", "sort"} |
| 118 | + integer_keys = {"includeGuids", "limit"} |
| 119 | + reserved_keys = special_keys | integer_keys |
| 120 | + while feed: |
| 121 | + key, value = feed.popleft() |
| 122 | + if key in integer_keys: |
| 123 | + filtersDict[key] = int(value) |
| 124 | + elif key == "type": |
| 125 | + filtersDict["libtype"] = utils.reverseSearchType(value) |
| 126 | + elif key == "sort": |
| 127 | + filtersDict["sort"] = value.split(",") |
| 128 | + else: |
| 129 | + feed.appendleft((key, value)) # put the item back |
| 130 | + filter_group = self._parseFilterGroups( |
| 131 | + feed, returnOn=reserved_keys |
| 132 | + ) |
| 133 | + if "filters" in filtersDict: |
| 134 | + filtersDict["filters"] = { |
| 135 | + "and": [filtersDict["filters"], filter_group] |
| 136 | + } |
| 137 | + else: |
| 138 | + filtersDict["filters"] = filter_group |
| 139 | + |
| 140 | + return filtersDict |
65 | 141 |
|
66 | 142 | def _parseFilters(self, content): |
67 | 143 | """ Parse the content string and returns the filter dict. """ |
68 | 144 | content = urlsplit(unquote(content)) |
69 | | - filters = {} |
70 | | - filterOp = 'and' |
71 | | - filterGroups = [[]] |
| 145 | + feed = deque() |
72 | 146 |
|
73 | 147 | for key, value in parse_qsl(content.query): |
74 | 148 | # Move = sign to key when operator is == |
75 | | - if value.startswith('='): |
76 | | - key += '=' |
77 | | - value = value[1:] |
78 | | - |
79 | | - if key == 'includeGuids': |
80 | | - filters['includeGuids'] = int(value) |
81 | | - elif key == 'type': |
82 | | - filters['libtype'] = utils.reverseSearchType(value) |
83 | | - elif key == 'sort': |
84 | | - filters['sort'] = value.split(',') |
85 | | - elif key == 'limit': |
86 | | - filters['limit'] = int(value) |
87 | | - elif key == 'push': |
88 | | - filterGroups[-1].append([]) |
89 | | - filterGroups.append(filterGroups[-1][-1]) |
90 | | - elif key == 'and': |
91 | | - filterOp = 'and' |
92 | | - elif key == 'or': |
93 | | - filterOp = 'or' |
94 | | - elif key == 'pop': |
95 | | - filterGroups[-1].insert(0, filterOp) |
96 | | - filterGroups.pop() |
97 | | - else: |
98 | | - filterGroups[-1].append({key: value}) |
99 | | - |
100 | | - if filterGroups: |
101 | | - filters['filters'] = self._formatFilterGroups(filterGroups.pop()) |
102 | | - return filters |
103 | | - |
104 | | - def _formatFilterGroups(self, groups): |
105 | | - """ Formats the filter groups into the advanced search rules. """ |
106 | | - if len(groups) == 1 and isinstance(groups[0], list): |
107 | | - groups = groups.pop() |
108 | | - |
109 | | - filterOp = 'and' |
110 | | - rules = [] |
| 149 | + if value.startswith("="): |
| 150 | + key, value = f"{key}=", value[1:] |
111 | 151 |
|
112 | | - for g in groups: |
113 | | - if isinstance(g, list): |
114 | | - rules.append(self._formatFilterGroups(g)) |
115 | | - elif isinstance(g, dict): |
116 | | - rules.append(g) |
117 | | - elif g in {'and', 'or'}: |
118 | | - filterOp = g |
| 152 | + feed.append((key, value)) |
119 | 153 |
|
120 | | - return {filterOp: rules} |
| 154 | + return self._parseQueryFeed(feed) |
121 | 155 |
|
122 | 156 |
|
123 | 157 | class SplitMergeMixin: |
|
0 commit comments