-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdistance_model.py
More file actions
163 lines (132 loc) · 5.26 KB
/
distance_model.py
File metadata and controls
163 lines (132 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from WikiModel import WikiPage
from utils import get_wiki_url
class Entry:
def __init__(self, key, value):
self.key = key
self.value = value
def __lt__(self, other):
return self.key < other.key
def __gt__(self, other):
return self.key > other.key
def __eq__(self, other):
return self.key == other.key
def __hash__(self):
return hash(self.key)
def __repr__(self):
return repr(self.key) + ": " + repr(self.value)
class PriorityQueue:
def __init__(self, init_entries=None):
if init_entries is None:
init_entries = []
self.entries = init_entries
self.value_to_index = {entry.value: i for i, entry in enumerate(self.entries)}
for i in range(len(self.entries))[::-1]:
self.min_heapify(i)
@staticmethod
def left_child(index):
return index * 2 + 1
@staticmethod
def right_child(index):
return index * 2 + 2
@staticmethod
def parent(index):
return (index - 1) // 2
def change_key(self, value, new_key):
index = self.value_to_index[value]
entry = self.entries[index]
old_key = entry.key
if old_key <= new_key:
entry.key = old_key
else:
self.decrease_key(index, new_key)
def decrease_key(self, index, new_key):
target_entry = self.entries[index]
target_entry.key = new_key
selected_index = index
while True:
parent_index = PriorityQueue.parent(selected_index)
if parent_index < 0:
return
parent_entry = self.entries[parent_index]
parent_key = parent_entry.key
if parent_key < new_key:
return
self.entries[parent_index] = target_entry
self.value_to_index[target_entry.value] = parent_index
self.entries[selected_index] = parent_entry
self.value_to_index[parent_entry.value] = selected_index
selected_index = parent_index
def push(self, key, value):
if value in self.value_to_index:
old_entry_index = self.value_to_index[value]
old_entry = self.entries[old_entry_index]
old_key = old_entry.key
if old_key < key:
self.decrease_key(old_entry_index, key)
return
index = len(self.entries)
self.entries.append(Entry(key, value))
self.value_to_index[value] = index
self.decrease_key(index, key)
def min_heapify(self, start_index=0):
start_entry = self.entries[start_index]
left_index = PriorityQueue.left_child(start_index)
right_index = PriorityQueue.right_child(start_index)
n = len(self.entries)
def get_candidates():
if left_index < n:
yield left_index, self.entries[left_index]
if right_index < n:
yield right_index, self.entries[right_index]
yield start_index, start_entry
min_index, min_entry = min(get_candidates(), key=lambda candidate: candidate[1].key)
if min_index != start_index:
self.entries[min_index] = start_entry
self.value_to_index[start_entry.value] = min_index
self.entries[start_index] = min_entry
self.value_to_index[min_entry.value] = start_index
self.min_heapify(min_index)
def __contains__(self, item):
return item in self.value_to_index
def __repr__(self):
return ", ".join(repr(entry) for entry in self.entries)
def get_key(self, value):
if value in self.value_to_index:
return self.entries[self.value_to_index[value]].key
return None
def pop(self):
first_entry = self.entries[0]
self.entries[0] = self.entries[-1]
del self.entries[-1]
self.min_heapify()
return first_entry
PAGE_JUMP_DISADVANTAGE = 5
class DistanceModel:
def __init__(self, start_term, target_term):
self.result = None
start_page = WikiPage.generate(start_term, target_term, None)
self.start_term = start_term
self.target_term = target_term
init_entries = [Entry(distance, link.set_text_func(text_func)) for link, distance, text_func in
start_page.distance_generator()]
self.queue = PriorityQueue(init_entries)
def process_link(self, link, distance):
print("Processing", link, "at distance", distance)
if link.is_target:
self.result = link.get_linked_text()
return True
new_page = WikiPage.generate(link.original_link_text, self.target_term, link)
if new_page is None:
return False
for link, additional_distance, text_func in new_page.distance_generator():
new_distance = additional_distance + distance + PAGE_JUMP_DISADVANTAGE
old_distance = self.queue.get_key(link)
if old_distance is None:
self.queue.push(new_distance, link.set_text_func(text_func))
elif new_distance < old_distance:
self.queue.change_key(link, new_distance)
return False
def iterate(self):
print(self.queue)
entry = self.queue.pop()
return self.process_link(entry.value, entry.key)