Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
worker: [aragorn, aragorn_lookup, aragorn_omnicorp, aragorn_score, arax, bte, bte_lookup, sipr, filter_kgraph_orphans, filter_results_top_n, finish_query, merge_message, sort_results_score]
worker: [aragorn, aragorn_lookup, aragorn_pathfinder, aragorn_omnicorp, aragorn_score, arax, bte, bte_lookup, sipr, filter_kgraph_orphans, filter_results_top_n, finish_query, merge_message, sort_results_score]
steps:
- name: Check out the repo
uses: actions/checkout@v4
Expand Down
14 changes: 14 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ services:
volumes:
- ./logs:/app/logs
- ./.env:/app/.env
aragorn_pathfinder:
container_name: aragorn_pathfinder
build:
context: .
dockerfile: workers/aragorn_pathfinder/Dockerfile
restart: unless-stopped
depends_on:
shepherd_db:
condition: service_healthy
shepherd_broker:
condition: service_healthy
volumes:
- ./logs:/app/logs
- ./.env:/app/.env
aragorn_omnicorp:
container_name: aragorn_omnicorp
build:
Expand Down
2 changes: 1 addition & 1 deletion shepherd_server/openapi-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ contact:
x-id: https://github.com/maximusunc
x-role: responsible developer
description: '<img src="/static/favicon.png" width="200px"><br /><br />Shepherd: Translator Autonomous Relay Agent Platform'
version: 0.4.6
version: 0.5.0
servers:
- description: Default server
url: https://shepherd.renci.org
Expand Down
2 changes: 2 additions & 0 deletions shepherd_utils/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ async def get_task(stream, group, consumer, logger: logging.Logger):

except Exception as e:
logger.info(f"Failed to get task for {stream}, {e}")
# wait a second before trying again, handle intermittent disconnections
await asyncio.sleep(1)
pass
return None

Expand Down
7 changes: 7 additions & 0 deletions shepherd_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ class Settings(BaseSettings):
sync_kg_retrieval_url: str = "https://strider.renci.org/query"
omnicorp_url: str = "https://aragorn-ranker.renci.org/omnicorp_overlay"
arax_url: str = "https://arax.ncats.io/shepherd/api/arax/v1.4/query"
node_norm: str = "https://nodenormalization-sri.renci.org/"

pathfinder_redis_host: str = "host.docker.internal"
pathfinder_redis_port: int = 6383
pathfinder_redis_password: str = "supersecretpassword"
pathfinder_pmid_db: int = 1
pathfinder_curies_db: int = 2

otel_enabled: bool = True
jaeger_host: str = "http://jaeger"
Expand Down
4 changes: 1 addition & 3 deletions shepherd_utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ def get_logging_config():
logging_config = {
"version": 1,
"formatters": {
"default": {
"format": "[%(asctime)s: %(levelname)s/%(name)s]: %(message)s"
}
"default": {"format": "[%(asctime)s: %(levelname)s/%(name)s]: %(message)s"}
},
"handlers": handlers,
"loggers": {
Expand Down
121 changes: 120 additions & 1 deletion shepherd_utils/shared.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Shared Shepherd Utility Functions."""

import asyncio
import copy
import json
import logging
from typing import AsyncGenerator, Dict, List, Tuple

from opentelemetry.context.context import Context
from opentelemetry.propagate import extract
from typing import AsyncGenerator, Dict, List, Tuple, Union

from .broker import add_task, get_task, mark_task_as_complete
from .config import settings
Expand Down Expand Up @@ -181,3 +183,120 @@ def validate_message(message, logger):
if not valid:
with open("invalid_message.json", "w", encoding="utf-8") as f:
json.dump(message, f, indent=2)


def combine_unique_dicts(list1, list2, logger: logging.Logger):
"""Combine two lists of dicts, keeping only unique dictionaries"""

def make_list_hashable(l):
"""Convert list to tuples."""
frozen_items = []
for item in l:
if isinstance(item, list):
frozen_items.append(make_list_hashable(item))
elif isinstance(item, dict):
frozen_items.append(make_hashable(item))
else:
frozen_items.append(item)
return tuple(sorted(frozen_items))

def make_hashable(d):
"""Convert lists to tuples to make dict hashable"""
hashable_items = []
for key, value in d.items():
if isinstance(value, list):
if all(isinstance(item, str) for item in value):
hashable_items.append((key, tuple(value)))
else:
make_list_hashable(value)
elif isinstance(value, dict):
# Handle nested dicts recursively
hashable_items.append((key, frozenset(make_hashable(value))))
else:
hashable_items.append((key, value))
return tuple(sorted(hashable_items))

seen = set()
result = []

for d in list1 + list2: # This processes ALL items from BOTH lists
dict_signature = make_hashable(d)
try:
if dict_signature not in seen:
seen.add(dict_signature)
result.append(d) # Adds to result if not seen before
except Exception:
logger.error(f"Failed to hash this: {dict_signature}")

return result


def merge_kgraph(og_message, new_message, logger: logging.Logger):
"""Merge two TRAPI kgraphs together."""
merged_kgraph = copy.deepcopy(og_message)
for key, value in new_message["nodes"].items():
existing = og_message["nodes"].get(key, None)
if existing is not None:
# merge
if value["name"]:
merged_kgraph["nodes"][key]["name"] = value["name"]
if value["categories"]:
if existing["categories"]:
all_categories = (
merged_kgraph["nodes"][key]["categories"] + value["categories"]
)
merged_kgraph["nodes"][key]["categories"] = list(
set(all_categories)
)
else:
merged_kgraph["nodes"][key]["categories"] = value["categories"]
if value["attributes"]:
if existing["attributes"]:
merged_kgraph["nodes"][key]["attributes"] = combine_unique_dicts(
existing["attributes"],
value["attributes"],
logger,
)
else:
merged_kgraph["nodes"][key]["attributes"] = value["attributes"]
else:
merged_kgraph["nodes"][key] = value

for key, value in new_message["edges"].items():
existing = og_message["edges"].get(key, None)
if existing is not None:
# merge
if value["attributes"]:
if existing["attributes"]:
new_attributes = []
# just filtering out the new knowledge_level and agent_type attributes
for attribute in value["attributes"]:
if attribute["attribute_type_id"] not in (
"biolink:knowledge_level",
"biolink:agent_type",
):
# don't add any new KL/AT
new_attributes.append(attribute)
merged_kgraph["edges"][key]["attributes"] = combine_unique_dicts(
existing["attributes"],
value["attributes"],
logger,
)
else:
merged_kgraph["edges"][key]["attributes"] = value["attributes"]

if value["sources"]:
if existing["sources"]:
new_sources = combine_unique_dicts(
existing["sources"],
value["sources"],
logger,
)
# TODO: there might need to be some sort of upstream resource id merging to do past this?
merged_kgraph["edges"][key]["sources"] = new_sources
else:
merged_kgraph["edges"][key]["sources"] = value["sources"]
else:
merged_kgraph["edges"][key] = value

return merged_kgraph
5 changes: 4 additions & 1 deletion tests/unit/aragorn/test_aragorn_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ async def test_aragorn_creative_lookup(redis_mock, mocker):
)
mock_running_callbacks.return_value = []
mock_response = mocker.Mock()
mocker.patch("httpx.AsyncClient.post", return_value=mock_response)
mock_response.status_code = 200
mock_httpx = mocker.patch("httpx.AsyncClient.post", return_value=mock_response)
logger = logging.getLogger(__name__)

await aragorn_lookup(
Expand All @@ -40,6 +41,8 @@ async def test_aragorn_creative_lookup(redis_mock, mocker):
logger,
)

assert mock_httpx.called

# Get the task that the ara should have put on the queue
task = await get_task("aragorn.score", "consumer", "test", logger)
assert task is not None
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_merge_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ async def test_message_merge(redis_mock, mocker):
callback_response = copy.deepcopy(response_2)
original_response = generate_response()

logger = logging.getLogger(__name__)

merged_message = merge_messages(
"test_ara",
lookup_query_graph,
original_query_graph,
result_messages=[original_response, callback_response],
logger=logger,
)

assert len(merged_message["message"]["results"]) == 3
20 changes: 15 additions & 5 deletions workers/aragorn/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,22 @@ def examine_query(message):
qedges = message.get("message", {}).get("query_graph", {}).get("edges", {})
except:
qedges = {}
try:
# this can still fail if the input looks like e.g.:
# "query_graph": None
qpaths = message.get("message", {}).get("query_graph", {}).get("paths", {})
except:
qpaths = {}
if len(qpaths) > 1:
raise Exception("Only a single path is supported", 400)
if (len(qpaths) > 0) and (len(qedges) > 0):
raise Exception("Mixed mode pathfinder queries are not supported", 400)
pathfinder = len(qpaths) == 1
n_infer_edges = 0
for edge_id in qedges:
if qedges.get(edge_id, {}).get("knowledge_type", "lookup") == "inferred":
n_infer_edges += 1
pathfinder = n_infer_edges == 3
if n_infer_edges > 1 and n_infer_edges and not pathfinder:
if n_infer_edges > 1 and n_infer_edges:
raise Exception("Only a single infer edge is supported", 400)
if (n_infer_edges > 0) and (n_infer_edges < len(qedges)):
raise Exception("Mixed infer and lookup queries not supported", 400)
Expand Down Expand Up @@ -98,9 +108,9 @@ async def aragorn(task, logger: logging.Logger):
]
elif pathfinder:
workflow = [
{"id": "aragorn.lookup"},
{"id": "aragorn.omnicorp"},
{"id": "aragorn.score"},
{"id": "aragorn.pathfinder"},
# {"id": "aragorn.omnicorp"},
# {"id": "aragorn.score"},
{"id": "filter_kgraph_orphans"},
]
else:
Expand Down
12 changes: 9 additions & 3 deletions workers/aragorn_lookup/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,19 @@ async def aragorn_lookup(task, logger: logging.Logger):

for response in responses:
if isinstance(response, Exception):
logger.error(f"Failed to do lookup and unable to remove callback id: {response}")
logger.error(
f"Failed to do lookup and unable to remove callback id: {response}"
)
elif isinstance(response, AsyncResponse):
if not response.success:
logger.error(f"Failed to do lookup, removing callback id: {response.error}")
logger.error(
f"Failed to do lookup, removing callback id: {response.error}"
)
await remove_callback_id(response.callback_id, logger)
else:
logger.error(f"Failed to do lookup and unable to remove callback id: {response}")
logger.error(
f"Failed to do lookup and unable to remove callback id: {response}"
)

# this worker might have a timeout set for if the lookups don't finish within a certain
# amount of time
Expand Down
34 changes: 34 additions & 0 deletions workers/aragorn_pathfinder/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Use RENCI python base image
FROM ghcr.io/translatorsri/renci-python-image:3.11.5

# Add image info
LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd

ENV PYTHONHASHSEED=0

# set up requirements
WORKDIR /app

# make sure all is writeable for the nru USER later on
RUN chmod -R 777 .

# Install requirements
COPY ./shepherd_utils ./shepherd_utils
COPY ./pyproject.toml .
RUN pip install .

COPY ./workers/aragorn_pathfinder/requirements.txt .
RUN pip install -r requirements.txt

# switch to the non-root user (nru). defined in the base image
USER nru

# Copy in files
COPY ./workers/aragorn_pathfinder ./

# Set up base for command and any variables
# that shouldn't be modified
# ENTRYPOINT ["uvicorn", "shepherd_server.server:APP"]

# Variables that can be overriden
CMD ["python", "worker.py"]
Empty file.
Loading
Loading