-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathentity-normalization.py
More file actions
201 lines (148 loc) · 6.6 KB
/
entity-normalization.py
File metadata and controls
201 lines (148 loc) · 6.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python3
import argparse
import json
import logging
import os
from itertools import chain
import litellm
import regex
from litellm import completion
from rapidfuzz import fuzz
from type_hints import PhraseIdentifier, PhraseNormalizationResult, Statement
litellm.enable_cache(type="disk")
PROMPT = '''
### Instructions
You will be presented with a privacy policy excerpt and a list of entity names found in the text.
Your task is to classify each entity as one of:
- "first party", i.e., "we" in the privacy policy;
- "second party", i.e., "you" or "the user" in the privacy policy;
- "third party", i.e., any other data processors or recipients;
- "other", i.e., irrelevant phrases wrongly labeled as entities.
Return the classification result as a JSON object with the entity name as the key and the classification as the value, e.g.:
{
"Company Foo Ltd": "first party",
"our service": "first party",
"you": "second party",
"children under 13": "second party",
"our affiliated companies": "first party",
"Bar Media LLC": "third party",
"Company Baz Inc": "third party",
"third-party services": "third party",
"advertisement networks": "third party",
"personal data": "other"
}
### Privacy Policy Excerpt
%s
### Entities
%s
'''
class EntityMatcher:
"""Fuzzy entity (company) name matcher"""
def __init__(self, entity_info_file):
with open(entity_info_file, "r", encoding="utf-8") as fin:
entity_info = json.load(fin)
self.entity_names = {}
self.ngram_mapping = {}
self.domain_mapping = {}
for entity, info in entity_info.items():
self.entity_names[entity] = info["aliases"]
for domain in info["domains"]:
self.domain_mapping[domain] = entity
for ngram, oov_flag in info["ngrams"].items():
self.ngram_mapping[ngram] = (entity, oov_flag)
self.keyword_matching_regex = regex.compile(
r"\b(?:\L<keywords>)\b",
keywords=self.ngram_mapping.keys(),
flags=regex.IGNORECASE
)
def match_name(self, name):
if name.lower() in self.domain_mapping:
yield self.domain_mapping[name.lower()]
return
for m in self.keyword_matching_regex.finditer(name):
entity, oov_flag = self.ngram_mapping[m[0].lower()]
if oov_flag:
yield entity
else:
r = regex.compile(r"\b(?:\L<keywords>)\b", keywords=[m[0]])
for full_name in self.entity_names[entity]:
if r.search(full_name):
yield entity
break
def main() -> None:
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("workdirs", nargs="+", help="Input directories")
parser.add_argument("--model", default="gpt-4o-mini", help="Model name")
parser.add_argument("--n-tries", type=int, default=10, help="Number of LLM outputs")
parser.add_argument("--entity-info", required=True, help="Entity information file")
args = parser.parse_args()
entity_matcher = EntityMatcher(args.entity_info)
for d in args.workdirs:
logging.info("Processing %s ...", d)
# with open(os.path.join(d, 'document.json'), 'rb') as fin:
# doc = json.load(fin)
with open(os.path.join(d, 'content.md'), 'r', encoding="utf-8") as fin:
content = fin.read()
with open(os.path.join(d, 'policy_statements.jsonl'), 'r', encoding='utf-8') as fin:
statements: list[Statement] = []
for line in fin:
item = json.loads(line)
statements.append(item["statement"])
entity_phrase_ids: set[PhraseIdentifier] = set()
result_counters: dict[str, dict[str, int]] = {}
for stmt in statements:
for item in chain(stmt.get("processor", []), stmt.get("recipient", [])):
phrase_id = tuple(item)
entity_phrase_ids.add(phrase_id)
result_counters[phrase_id[1]] = {'first party': 0, 'third party': 0, 'other': 0}
messages = [
{"role": "user", "content": PROMPT % (content, "\n".join(sorted(result_counters.keys())))},
]
response = completion(
model=args.model,
messages=messages,
response_format={"type": "json_object"},
n=args.n_tries,
)
for choice in response.choices:
raw_message = choice.message.content
try:
gpt_results = json.loads(raw_message)
except json.JSONDecodeError:
logging.error('Failed to parse JSON: %r', raw_message)
continue
logging.info('Response: %r', gpt_results)
for entity, item in result_counters.items():
match gpt_results.get(entity):
case "first party":
item['first party'] += 1
case "third party":
item['third party'] += 1
case _:
item['other'] += 1
normalization_results: list[PhraseNormalizationResult] = [
{"phrase_id": (-1, "first party"), "referents": []},
{"phrase_id": (-1, "third party"), "referents": []},
]
for phrase_id in sorted(entity_phrase_ids):
_, phrase = phrase_id
matched_phrase = max(result_counters, key=lambda x: fuzz.ratio(x, phrase), default='')
if matched_phrase.lower() != phrase.lower():
continue
counter = result_counters[matched_phrase]
label = max(counter.keys(), key=counter.get) # type: ignore
if label == 'first party':
normalization_results[0]['referents'].append(phrase_id) # type: ignore
normalization_results.append({"phrase_id": phrase_id})
elif label == 'third party':
normalization_results[1]['referents'].append(phrase_id) # type: ignore
extra_matched_entities = list(entity_matcher.match_name(phrase))
if extra_matched_entities:
logging.info("%r -> %s", phrase, extra_matched_entities)
normalization_results.append({"phrase_id": phrase_id, "referents": extra_matched_entities})
with open(os.path.join(d, 'entities.jsonl'), 'w', encoding='utf-8') as fout:
for item in normalization_results:
print(json.dumps(item), file=fout)
if __name__ == '__main__':
main()