Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ tier0:
connect_retries: 5 # Number of retries before declaring a connection failure.
grpc_max_send_message_length: -1 # gRPC max send message length in bytes (-1 for unlimited).
grpc_max_receive_message_length: -1 # gRPC max receive message length in bytes (-1 for unlimited).
dump_queries: false # Write out TRAPI and translated queries to a file in jsonl format.
tier1:
backend: elasticsearch
metadata_url: https://stars.renci.org/var/translator/releases/translator_kg/latest/graph-metadata.json
Expand Down
2 changes: 1 addition & 1 deletion config/openapi.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ x_trapi:
asyncquery: true # Supports async query type.
operations: [lookup]
batch_size_limit: 300 # Maximum IDs on any one node.
rate_limit: 300 # Maximum number of requests per minute.
rate_limit: 1000 # Maximum number of requests per minute.
test_data_location:
default:
url: https://raw.githubusercontent.com/NCATS-Tangerine/translator-api-registry/master/biothings_explorer/sri-test-bte-ara.json
Expand Down
2 changes: 1 addition & 1 deletion src/retriever/config/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class XTrapi(BaseModel):
] = 300
rate_limit: Annotated[
int, Field(description="Maximum number of requests per minute.")
] = 300
] = 1000
test_data_location: TestDataLocationObject = TestDataLocationObject()
externalDocs: ExternalDocs = ExternalDocs(
description="The values for version are restricted according to the regex in this external JSON schema. See schema and examples at url",
Expand Down
73 changes: 60 additions & 13 deletions src/retriever/lookup/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from retriever.lookup.qgx import QueryGraphExecutor
from retriever.lookup.utils import expand_qgraph, get_submitter
from retriever.lookup.validate import validate
from retriever.metadata.optable import OP_TABLE_MANAGER
from retriever.metadata.optable import (
OP_TABLE_MANAGER,
QueryNotTraversable,
UnsupportedConstraint,
)
from retriever.types.general import LookupArtifacts, QueryInfo
from retriever.types.trapi import (
AuxGraphID,
Expand Down Expand Up @@ -202,12 +206,14 @@ def passes_validation(

Prepares response with appropriate messages if not.
"""
validation_problems = validate(qgraph)
if validation_problems:
warnings, errors = validate(qgraph)
for warning in warnings:
job_log.warning(warning)
if len(errors) > 0:
job_log.error(
f"Query validation encountered {len(validation_problems)} error{'s' if len(validation_problems) > 1 else ''}. Error logs to follow:"
f"Query validation encountered {len(errors)} error{'s' if len(errors) > 1 else ''}. Error logs to follow:"
)
for problem in validation_problems:
for problem in errors:
job_log.error(f"Validation Error: {problem}")
job_log.error("Due to the above errors, your query terminates.")

Expand All @@ -230,17 +236,58 @@ async def qgraph_supported(

Prepares response with appropriate messages if not.
"""
operation_plan = await OP_TABLE_MANAGER.create_operation_plan(qgraph, tiers)
if isinstance(operation_plan, list):
job_log.warning(
f"MetaEdges could not be found for the following QEdge(s): {operation_plan}"
supported, plan_or_report = await OP_TABLE_MANAGER.create_operation_plan(
qgraph, tiers
)
if not supported:
missing_edges = [
qedge_id
for qedge_id, reason in plan_or_report.items()
if isinstance(reason, QueryNotTraversable)
]
unsupported_constraints = {
qedge_id: reason
for qedge_id, reason in plan_or_report.items()
if isinstance(reason, UnsupportedConstraint)
}
if len(missing_edges) > 0:
job_log.warning(
f"MetaEdges could not be found for the following QEdge(s): {missing_edges}"
)
if len(unsupported_constraints) > 0:
for qedge_id, reason in unsupported_constraints.items():
job_log.warning(
f"Unsupported constraint(s) present in QEdge `{qedge_id}` (if multiple, try a smaller combination): {reason.unmet}"
)
status = (
"QueryNotTraversable"
if "QueryNotTraversable" in plan_or_report.values()
else "UnsupportedConstraint"
)
response["status"] = status
reason_desc = (
"missing MetaEdges"
if status == "QueryNotTraversable"
else "unsupported constraints"
)
response["status"] = "QueryNotTraversable"
response["description"] = (
"Query cannot be traversed due to missing metaEdges. See logs for details."
f"Query cannot be traversed due to {reason_desc}. See logs for details."
)
return False
return True

unsupported_nodes = await OP_TABLE_MANAGER.qnodes_supported(qgraph, tiers)
if unsupported_nodes is not None:
for qnode_id, reason in unsupported_nodes.items():
job_log.warning(
f"Unsupported constraint(s) present in QNode `{qnode_id}` (if multiple, try a smaller combination): {reason.unmet}"
)
if response.get("status") != "QueryNotTraversable":
response["status"] = "UnsupportedConstraint"
response["description"] = (
"Query cannot be traversed due to unsupported constraints. See logs for details."
)

supported = supported and unsupported_nodes is None
return supported


@tracer.start_as_current_span("execute_lookup")
Expand Down
19 changes: 13 additions & 6 deletions src/retriever/lookup/qgx.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from asyncio.tasks import Task
from collections.abc import AsyncGenerator, Hashable, Iterable
from typing import Literal
from typing import Literal, cast

from opentelemetry import trace

Expand All @@ -19,7 +19,7 @@
from retriever.lookup.subclass import SubclassMapping
from retriever.lookup.subquery import get_subgraph, subquery
from retriever.lookup.utils import make_mappings
from retriever.metadata.optable import OP_TABLE_MANAGER
from retriever.metadata.optable import OP_TABLE_MANAGER, OperationPlan
from retriever.types.general import (
AdjacencyGraph,
KAdjacencyGraph,
Expand Down Expand Up @@ -162,12 +162,12 @@ async def execute(self) -> LookupArtifacts:
self.job_log.info(
f"Starting lookup against Tier {', '.join(str(t) for t in self.ctx.tiers if t > 0)}..."
)
operation_plan = await OP_TABLE_MANAGER.create_operation_plan(
supported, operation_plan = await OP_TABLE_MANAGER.create_operation_plan(
self.qgraph, {t for t in self.ctx.tiers if t > 0}
)
if isinstance(operation_plan, list):
if not supported:
self.job_log.warning(
f"Failed for find operations supporting query edge(s): {operation_plan}"
f"Failed for find operations supporting query edge(s): {list(operation_plan.keys())}"
)
return LookupArtifacts(
[], self.kgraph, self.aux_graphs, self.job_log.get_logs()
Expand All @@ -178,7 +178,14 @@ async def execute(self) -> LookupArtifacts:
starting_branches = await Branch.get_start_branches(
self.qedge_claims,
self.locks["claim"],
(self.qgraph, self.q_agraph, self.qedge_map, operation_plan),
(
self.qgraph,
self.q_agraph,
self.qedge_map,
cast(
OperationPlan, operation_plan
), # Should never be the alternative
),
self.job_log,
)
self.job_log.debug(
Expand Down
48 changes: 37 additions & 11 deletions src/retriever/lookup/validate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import get_type_hints

from reasoner_pydantic.shared import KnowledgeType

from retriever.config.openapi import OPENAPI_CONFIG
Expand All @@ -12,16 +14,20 @@
from retriever.utils import biolink


def validate(qg: QueryGraphDict | PathfinderQueryGraphDict) -> list[str]:
def validate(
qg: QueryGraphDict | PathfinderQueryGraphDict,
) -> tuple[list[str], list[str]]:
"""Check that a given query graph is valid.

Returns:
A list of messages detailing validation problems.
A list of warning messages, which do not fail validation but should be logged.
And a list of messages detailing validation problems.
If the list is empty, the graph passes validation.
"""
if "paths" in qg:
return ["Retriever does not support Pathfinder queries."]
problems: dict[str, bool] = {} # False means failing
return [], ["Retriever does not support Pathfinder queries."]
warnings = list[str]()
problems = dict[str, bool]() # False means failing
problems["Query graph must have at least one node"] = len(qg["nodes"].values()) > 0
problems["Query graph must have at least one edge"] = len(qg["edges"].values()) > 0
problems["Query graph must have at least one node with an ID"] = any(
Expand All @@ -32,17 +38,21 @@ def validate(qg: QueryGraphDict | PathfinderQueryGraphDict) -> list[str]:

# node_pairs = set[str]()
for qedge_id, qedge in qg["edges"].items():
problems.update(validate_qedge(qg, qedge_id, qedge))
edge_warnings, edge_problems = validate_qedge(qg, qedge_id, qedge)
problems.update(edge_problems)
warnings.extend(edge_warnings)

for qnode_id, qnode in qg["nodes"].items():
problems.update(validate_qnode(qg, qnode_id, qnode))
node_warnings, node_problems = validate_qnode(qg, qnode_id, qnode)
problems.update(node_problems)
warnings.extend(node_warnings)

return [name for name, passed in problems.items() if not passed]
return warnings, [name for name, passed in problems.items() if not passed]


def validate_qedge(
qg: QueryGraphDict, qedge_id: QEdgeID, qedge: QEdgeDict
) -> dict[str, bool]:
) -> tuple[list[str], dict[str, bool]]:
"""Find and return any problems with a given Query Edge.

Problems in the dictionary marked False are failing.
Expand Down Expand Up @@ -86,12 +96,20 @@ def validate_qedge(
# problems["Duplicate qedges not allowed."] = False
# node_pairs.add(f"{qedge.subject}-{qedge.object}")

return problems
warnings = list[str]()
known_fields = get_type_hints(QEdgeDict)
unknown_fields = [field for field in qedge if field not in known_fields]
if len(unknown_fields) > 0:
warnings.append(
f"Edge `{qedge_id}`: skipping unknown fields ({', '.join(unknown_fields)})"
)

return warnings, problems


def validate_qnode(
_qg: QueryGraphDict, qnode_id: QNodeID, qnode: QNodeDict
) -> dict[str, bool]:
) -> tuple[list[str], dict[str, bool]]:
"""Find and return any problems with a given Query Node.

Problems in the dictionary marked False are failing.
Expand All @@ -111,4 +129,12 @@ def validate_qnode(
False
)

return problems
warnings = list[str]()
known_fields = get_type_hints(QNodeDict)
unknown_fields = [field for field in qnode if field not in known_fields]
if len(unknown_fields) > 0:
warnings.append(
f"Node `{qnode_id}`: skipping unknown fields ({', '.join(unknown_fields)})"
)

return warnings, problems
Loading