Skip to content

Commit bfb6747

Browse files
authored
Merge pull request #59 from BioPack-team/merge_message_bugs
Merge message bugs
2 parents 1c1ce92 + cb56f09 commit bfb6747

File tree

10 files changed

+123
-74
lines changed

10 files changed

+123
-74
lines changed

shepherd_broker/redis.conf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ requirepass supersecretpassword
22
port 6379
33
maxmemory 4gb
44
# evict least frequently used keys when memory cap is hit
5-
maxmemory-policy volatile-lfu
5+
maxmemory-policy allkeys-lru
66
loglevel notice
77
# If we want to log to a file
88
logfile /data/shepherd_broker.log
99
save 60 50
1010
stop-writes-on-bgsave-error no
11+
dir /data
1112
dbfilename shepherd_broker.rdb
1213

1314
# enable larger entry writes

shepherd_server/openapi-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ contact:
44
x-id: https://github.com/maximusunc
55
x-role: responsible developer
66
description: '<img src="/static/favicon.png" width="200px"><br /><br />Shepherd: Translator Autonomous Relay Agent Platform'
7-
version: 0.5.1
7+
version: 0.5.2
88
servers:
99
- description: Default server
1010
url: https://shepherd.renci.org

shepherd_utils/config.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class Settings(BaseSettings):
2323
sync_kg_retrieval_url: str = "https://strider.renci.org/query"
2424
omnicorp_url: str = "https://aragorn-ranker.renci.org/omnicorp_overlay"
2525
arax_url: str = "https://arax.ncats.io/shepherd/api/arax/v1.4/query"
26-
node_norm: str = "https://nodenormalization-sri.renci.org/"
26+
node_norm: str = "https://biothings.ci.transltr.io/nodenorm/api/"
2727

2828
pathfinder_redis_host: str = "host.docker.internal"
2929
pathfinder_redis_port: int = 6383
@@ -35,6 +35,9 @@ class Settings(BaseSettings):
3535
jaeger_host: str = "http://jaeger"
3636
jaeger_port: int = 4317
3737

38+
# ttl in seconds
39+
redis_ttl: int = 1210000
40+
3841
class Config:
3942
env_file = ".env"
4043

shepherd_utils/db.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,16 @@ async def add_query(
105105
connection_pool=data_db_pool,
106106
)
107107
# print(f"Putting {query_id} on {ara_target} stream")
108-
await client.set(query_id, zstandard.compress(orjson.dumps(query)))
109-
await client.set(response_id, zstandard.compress(orjson.dumps(query)))
108+
await client.set(
109+
query_id,
110+
zstandard.compress(orjson.dumps(query)),
111+
ex=settings.redis_ttl,
112+
)
113+
await client.set(
114+
response_id,
115+
zstandard.compress(orjson.dumps(query)),
116+
ex=settings.redis_ttl,
117+
)
110118
await client.aclose()
111119
except Exception as e:
112120
# failed to put message in db
@@ -153,7 +161,11 @@ async def save_message(
153161
connection_pool=data_db_pool,
154162
)
155163
# print(f"Putting {query_id} on {ara_target} stream")
156-
await client.set(callback_id, compressed)
164+
await client.set(
165+
callback_id,
166+
compressed,
167+
ex=settings.redis_ttl,
168+
)
157169
await client.aclose()
158170
logger.debug(f"Saving message took {time.time() - start} seconds")
159171
except Exception as e:
@@ -226,7 +238,9 @@ async def save_logs(
226238
new_logs = list(handler.contents())
227239
new_logs.reverse()
228240
existing_logs.extend(new_logs)
229-
await client.set(response_id, orjson.dumps(existing_logs))
241+
await client.set(
242+
response_id, orjson.dumps(existing_logs), ex=settings.redis_ttl
243+
)
230244
await client.aclose()
231245
except Exception as e:
232246
logger.error(f"Failed to save logs for response {response_id}: {e}")

shepherd_utils/shared.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,16 @@ def recursive_get_auxgraph_edges(
162162
return edges, auxgraphs, nodes
163163

164164

165+
def is_support_edge(edge) -> bool:
166+
"""Checks if a given edge is a support edge."""
167+
if "attributes" not in edge:
168+
return False
169+
for attribute in edge["attributes"]:
170+
if attribute["attribute_type_id"] == "biolink:support_graphs":
171+
return True
172+
return False
173+
174+
165175
def validate_message(message, logger):
166176
"""Validate a given message for missing nodes."""
167177
valid = True
@@ -231,7 +241,7 @@ def make_hashable(d):
231241
return result
232242

233243

234-
def merge_kgraph(og_message, new_message, logger: logging.Logger):
244+
def merge_kgraph(og_message, new_message, source, logger: logging.Logger):
235245
"""Merge two TRAPI kgraphs together."""
236246
merged_kgraph = copy.deepcopy(og_message)
237247
for key, value in new_message["nodes"].items():
@@ -299,4 +309,18 @@ def merge_kgraph(og_message, new_message, logger: logging.Logger):
299309
else:
300310
merged_kgraph["edges"][key] = value
301311

312+
if value["sources"] and not is_support_edge(value):
313+
new_sources = combine_unique_dicts(
314+
value["sources"],
315+
[
316+
{
317+
"resource_id": source,
318+
"resource_role": "aggregator_knowledge_source",
319+
"upstream_resource_ids": ["infores:retriever"],
320+
}
321+
],
322+
logger,
323+
)
324+
merged_kgraph["edges"][key]["sources"] = new_sources
325+
302326
return merged_kgraph

tests/unit/test_merge_message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ async def test_message_merge(redis_mock, mocker):
2626

2727
merged_message = merge_messages(
2828
"test_ara",
29-
lookup_query_graph,
3029
original_query_graph,
31-
result_messages=[original_response, callback_response],
30+
original_response,
31+
callback_response,
3232
logger=logger,
3333
)
3434

workers/aragorn_lookup/worker.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,6 @@ async def aragorn_lookup(task, logger: logging.Logger):
128128
# Put callback UID and query ID in postgres
129129
callback_id = str(uuid.uuid4())[:8]
130130
await add_callback_id(query_id, callback_id, logger)
131-
# put lookup query graph in redis
132-
await save_message(
133-
f"{query_id}_lookup_query_graph", message["message"]["query_graph"], logger
134-
)
135131
message["callback"] = f"{settings.callback_host}/aragorn/callback/{callback_id}"
136132

137133
async with httpx.AsyncClient(timeout=100) as client:
@@ -142,12 +138,6 @@ async def aragorn_lookup(task, logger: logging.Logger):
142138
else:
143139
expanded_messages = expand_aragorn_query(message)
144140
requests = []
145-
# put lookup query graph in redis
146-
await save_message(
147-
f"{query_id}_lookup_query_graph",
148-
expanded_messages[0]["message"]["query_graph"],
149-
logger,
150-
)
151141
# send all messages to lookup service
152142
async with httpx.AsyncClient(timeout=20) as client:
153143
for expanded_message in expanded_messages:

workers/aragorn_pathfinder/worker.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ async def shadowfax(task, logger: logging.Logger):
136136
parameters["tiers"] = parameters.get("tiers") or [0]
137137
message["parameters"] = parameters
138138

139+
source = "infores:shepherd-aragorn"
140+
139141
qgraph = message["message"]["query_graph"]
140142
pinned_node_keys = []
141143
pinned_node_ids = []
@@ -202,7 +204,9 @@ async def shadowfax(task, logger: logging.Logger):
202204
pairwise_pubs = get_the_pmids([source_node, target_node])
203205
if source_pubs == 0 or target_pubs == 0 or len(pairwise_pubs) == 0:
204206
logger.info("No publications found.")
205-
return message, 200
207+
await wrap_up_task(STREAM, GROUP, task, workflow, logger)
208+
logger.info(f"Task took {time.time() - start}")
209+
return
206210

207211
# Find other nodes from those shared publications
208212
curies = set()
@@ -218,12 +222,16 @@ async def shadowfax(task, logger: logging.Logger):
218222

219223
if len(curies) == 0:
220224
logger.info("No curies found.")
221-
return message, 200
225+
await wrap_up_task(STREAM, GROUP, task, workflow, logger)
226+
logger.info(f"Task took {time.time() - start}")
227+
return
222228

223229
normalizer_response = await get_normalized_curies(list(curies), logger)
224230
if normalizer_response is None:
225231
logger.error("Failed to get a good response from Node Normalizer")
226-
return message, 500
232+
await wrap_up_task(STREAM, GROUP, task, workflow, logger)
233+
logger.info(f"Task took {time.time() - start}")
234+
return
227235

228236
curie_info = defaultdict(dict)
229237
for curie, normalizer_info in normalizer_response.items():
@@ -306,7 +314,7 @@ async def shadowfax(task, logger: logging.Logger):
306314
try:
307315
logger.debug(f"Got back {len(lookup_message.get('results', 0))} results.")
308316
merged_kgraph = merge_kgraph(
309-
merged_kgraph, lookup_message["knowledge_graph"], logger
317+
merged_kgraph, lookup_message["knowledge_graph"], source, logger
310318
)
311319
merged_aux_graphs.update(lookup_message["auxiliary_graphs"])
312320
except KeyError as e:
@@ -475,7 +483,7 @@ async def shadowfax(task, logger: logging.Logger):
475483
aux_edges_keys.append(aux_graph_key)
476484

477485
analysis = {
478-
"resource_id": "infores:aragorn",
486+
"resource_id": source,
479487
"path_bindings": {
480488
path_key: [{"id": aux_graph_key, "attributes": []}],
481489
},

workers/bte_lookup/worker.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,6 @@ async def bte_lookup(task, logger: logging.Logger):
131131
# Put callback UID and query ID in postgres
132132
callback_id = str(uuid.uuid4())[:8]
133133
await add_callback_id(query_id, callback_id, logger)
134-
# put lookup query graph in redis
135-
await save_message(
136-
f"{query_id}_lookup_query_graph", message["message"]["query_graph"], logger
137-
)
138134
message["callback"] = f"{settings.callback_host}/bte/callback/{callback_id}"
139135

140136
async with httpx.AsyncClient(timeout=100) as client:
@@ -146,16 +142,6 @@ async def bte_lookup(task, logger: logging.Logger):
146142
expanded_messages = expand_bte_query(message, logger)
147143
logger.info(f"Expanded to {len(expanded_messages)} messages")
148144
requests = []
149-
if len(expanded_messages) == 0:
150-
lookup_query_graph = message["message"]["query_graph"]
151-
else:
152-
lookup_query_graph = expanded_messages[0]["message"]["query_graph"]
153-
# put lookup query graph in redis
154-
await save_message(
155-
f"{query_id}_lookup_query_graph",
156-
lookup_query_graph,
157-
logger,
158-
)
159145
# send all messages to retriever
160146
async with httpx.AsyncClient(timeout=20) as client:
161147
for expanded_message in expanded_messages:

0 commit comments

Comments
 (0)