Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion code_review_graph/changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def analyze_changes(
for node in changed_funcs:
if node.is_test:
continue
tested = store.get_edges_by_target(node.qualified_name)
tested = store.get_edges_by_source(node.qualified_name)
if not any(e.kind == "TESTED_BY" for e in tested):
test_gaps.append({
"name": _sanitize_name(node.name),
Expand Down
6 changes: 4 additions & 2 deletions code_review_graph/enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ def _format_node_context(
lines.append(f" Flows: {', '.join(flow_names)}")

# Tests
# TESTED_BY edges are stored as source=production, target=test by the
# parser, so look them up by source. See: #515
tests: list[str] = []
for e in store.get_edges_by_target(qn):
for e in store.get_edges_by_source(qn):
if e.kind == "TESTED_BY" and len(tests) < 3:
t = store.get_node(e.source_qualified)
t = store.get_node(e.target_qualified)
if t:
tests.append(t.name)
if tests:
Expand Down
47 changes: 25 additions & 22 deletions code_review_graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@ def get_transitive_tests(
) -> list[dict]:
"""Find tests covering a node, including indirect (transitive) coverage.

1. Direct: TESTED_BY edges targeting this node (+ bare-name fallback).
TESTED_BY edges are stored as source=production, target=test by
the parser, so look them up by source_qualified. See: #515

1. Direct: TESTED_BY edges originating at this node (+ bare-name fallback).
2. Indirect: follow outgoing CALLS edges up to *max_depth* hops,
then collect TESTED_BY edges on each callee.

Expand Down Expand Up @@ -421,31 +424,31 @@ def _node_dict(qn: str, indirect: bool) -> dict | None:
"indirect": indirect,
}

# Direct TESTED_BY
# Direct TESTED_BY (source=production, target=test). See: #515
for qn in input_qns:
for row in conn.execute(
"SELECT source_qualified FROM edges "
"WHERE target_qualified = ? AND kind = 'TESTED_BY'",
"SELECT target_qualified FROM edges "
"WHERE source_qualified = ? AND kind = 'TESTED_BY'",
(qn,),
).fetchall():
src = row["source_qualified"]
if src not in seen:
seen.add(src)
d = _node_dict(src, indirect=False)
tgt = row["target_qualified"]
if tgt not in seen:
seen.add(tgt)
d = _node_dict(tgt, indirect=False)
if d:
results.append(d)

# Bare-name fallback for direct
bare = qualified_name.rsplit("::", 1)[-1] if "::" in qualified_name else qualified_name
for row in conn.execute(
"SELECT source_qualified FROM edges "
"WHERE target_qualified = ? AND kind = 'TESTED_BY'",
"SELECT target_qualified FROM edges "
"WHERE source_qualified = ? AND kind = 'TESTED_BY'",
(bare,),
).fetchall():
src = row["source_qualified"]
if src not in seen:
seen.add(src)
d = _node_dict(src, indirect=False)
tgt = row["target_qualified"]
if tgt not in seen:
seen.add(tgt)
d = _node_dict(tgt, indirect=False)
if d:
results.append(d)

Expand All @@ -464,14 +467,14 @@ def _node_dict(qn: str, indirect: bool) -> dict | None:
next_frontier = set(list(next_frontier)[:max_frontier])
for callee in next_frontier:
for row in conn.execute(
"SELECT source_qualified FROM edges "
"WHERE target_qualified = ? AND kind = 'TESTED_BY'",
"SELECT target_qualified FROM edges "
"WHERE source_qualified = ? AND kind = 'TESTED_BY'",
(callee,),
).fetchall():
src = row["source_qualified"]
if src not in seen:
seen.add(src)
d = _node_dict(src, indirect=True)
tgt = row["target_qualified"]
if tgt not in seen:
seen.add(tgt)
d = _node_dict(tgt, indirect=True)
if d:
results.append(d)
frontier = next_frontier
Expand Down Expand Up @@ -1269,8 +1272,8 @@ def load_flow_adjacency(self) -> "FlowAdjacency":
kind, src, tgt = row["kind"], row["source_qualified"], row["target_qualified"]
if kind == "CALLS":
calls_out.setdefault(src, []).append(tgt)
else: # TESTED_BY
has_tested_by.add(tgt)
else: # TESTED_BY: source is the production node being tested. See: #515
has_tested_by.add(src)

return FlowAdjacency(
calls_out=calls_out,
Expand Down
6 changes: 4 additions & 2 deletions code_review_graph/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ def query_graph(
results.append(node_to_dict(child))

elif pattern == "tests_for":
for e in store.get_edges_by_target(qn):
# TESTED_BY edges are stored as source=production, target=test
# by the parser, so look them up by source. See: #515
for e in store.get_edges_by_source(qn):
if e.kind == "TESTED_BY":
test = store.get_node(e.source_qualified)
test = store.get_node(e.target_qualified)
if test:
results.append(node_to_dict(test))
# Also search by naming convention
Expand Down
12 changes: 7 additions & 5 deletions tests/test_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ def _add_call(self, source_qn: str, target_qn: str, path: str = "app.py") -> Non
self.store.upsert_edge(edge)
self.store.commit()

def _add_tested_by(self, test_qn: str, target_qn: str, path: str = "app.py") -> None:
def _add_tested_by(self, production_qn: str, test_qn: str, path: str = "app.py") -> None:
# TESTED_BY edges are stored as source=production, target=test
# by the parser. See: #515
edge = EdgeInfo(
kind="TESTED_BY",
source=test_qn,
target=target_qn,
source=production_qn,
target=test_qn,
file_path=path,
line=1,
)
Expand Down Expand Up @@ -225,7 +227,7 @@ def test_risk_score_untested_is_higher(self):
self._add_func("untested_func", path="a.py", line_start=1, line_end=10)
self._add_func("tested_func", path="b.py", line_start=1, line_end=10)
self._add_func("test_tested_func", path="test_b.py", is_test=True)
self._add_tested_by("test_b.py::test_tested_func", "b.py::tested_func", "test_b.py")
self._add_tested_by("b.py::tested_func", "test_b.py::test_tested_func", "test_b.py")

untested = self.store.get_node("a.py::untested_func")
tested = self.store.get_node("b.py::tested_func")
Expand Down Expand Up @@ -367,7 +369,7 @@ def test_analyze_detects_test_gaps(self):

# Only tested_c has a test.
self._add_func("test_c", path="test_app.py", is_test=True)
self._add_tested_by("test_app.py::test_c", "app.py::tested_c", "test_app.py")
self._add_tested_by("app.py::tested_c", "test_app.py::test_c", "test_app.py")

result = analyze_changes(
self.store,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def _seed_data(self):
),
EdgeInfo(
kind="TESTED_BY",
source=f"{self.tmpdir}/test_parser.py::test_parse_file",
target=f"{self.tmpdir}/parser.py::parse_file",
source=f"{self.tmpdir}/parser.py::parse_file",
target=f"{self.tmpdir}/test_parser.py::test_parse_file",
file_path=f"{self.tmpdir}/test_parser.py", line=1,
),
]
Expand Down
62 changes: 61 additions & 1 deletion tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,66 @@ def test_metadata(self):
assert self.store.get_metadata("test_key") == "test_value"
assert self.store.get_metadata("nonexistent") is None

def test_get_transitive_tests_follows_direct_tested_by_edge(self):
"""Regression test for #515: get_transitive_tests must follow
TESTED_BY edges by source_qualified (production) since the parser
stores source=production, target=test. The test function uses an
unconventional name so the bare-name fallback cannot mask the bug.
"""
self.store.upsert_node(self._make_file_node("/src/calc.py"))
self.store.upsert_node(self._make_func_node("add", "/src/calc.py"))
self.store.upsert_node(self._make_file_node("/tests/check.py"))
self.store.upsert_node(self._make_func_node(
"verify_addition", "/tests/check.py", is_test=True,
))
self.store.upsert_edge(EdgeInfo(
kind="TESTED_BY",
source="/src/calc.py::add",
target="/tests/check.py::verify_addition",
file_path="/tests/check.py", line=1,
))
self.store.commit()

results = self.store.get_transitive_tests("/src/calc.py::add")
qns = {r["qualified_name"] for r in results}
assert "/tests/check.py::verify_addition" in qns
assert all(not r["indirect"] for r in results)

def test_get_transitive_tests_follows_calls_then_tested_by(self):
"""Transitive coverage: caller -> CALLS -> callee -> TESTED_BY -> test.
Uses an unconventional test name so the bare-name fallback cannot
match. See: #515.
"""
self.store.upsert_node(self._make_file_node("/src/svc.py"))
self.store.upsert_node(self._make_func_node("orchestrate", "/src/svc.py"))
self.store.upsert_node(self._make_func_node("compute", "/src/svc.py"))
self.store.upsert_node(self._make_file_node("/tests/check.py"))
self.store.upsert_node(self._make_func_node(
"verify_compute", "/tests/check.py", is_test=True,
))
self.store.upsert_edge(EdgeInfo(
kind="CALLS", source="/src/svc.py::orchestrate",
target="/src/svc.py::compute", file_path="/src/svc.py", line=2,
))
self.store.upsert_edge(EdgeInfo(
kind="TESTED_BY",
source="/src/svc.py::compute",
target="/tests/check.py::verify_compute",
file_path="/tests/check.py", line=1,
))
self.store.commit()

results = self.store.get_transitive_tests(
"/src/svc.py::orchestrate", max_depth=2,
)
qns = {r["qualified_name"] for r in results}
assert "/tests/check.py::verify_compute" in qns
match = next(
r for r in results
if r["qualified_name"] == "/tests/check.py::verify_compute"
)
assert match["indirect"] is True

def test_get_all_community_ids_logs_when_column_missing(self, caplog):
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
Expand Down Expand Up @@ -401,7 +461,7 @@ def test_uncapped_small_frontier_unchanged(self):
# Only callee_2 has a test
if i == 2:
self.store.upsert_edge(EdgeInfo(
kind="TESTED_BY", source=test_qn, target=callee_qn,
kind="TESTED_BY", source=callee_qn, target=test_qn,
file_path="/t/test_hub.py", line=1,
))
self.store.commit()
Expand Down
33 changes: 33 additions & 0 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,39 @@ def test_tested_by_edges_generated(self):
tested_by = [e for e in edges if e.kind == "TESTED_BY"]
assert len(tested_by) >= 1

def test_tested_by_edge_direction(self):
"""Regression for #515: TESTED_BY must point production → test.

Producer-side guard. Reads naturally as "X is tested by Y":
source = production code, target = the test that covers it.
Consumer-side queries (tests_for, get_transitive_tests,
test-gap detection, flow criticality) were fixed in #515 to
match this canonical direction. Without this assertion the
parser could silently flip the direction and every consumer
test would still pass against the inverted edges.
"""
nodes, edges = self.parser.parse_file(FIXTURES / "test_sample.py")
tested_by = [e for e in edges if e.kind == "TESTED_BY"]
assert len(tested_by) >= 1, "fixture should yield at least one TESTED_BY edge"

test_file = str(FIXTURES / "test_sample.py")
test_qualified = {
f"{test_file}::{n.name}" for n in nodes if n.kind == "Test"
}
assert test_qualified, "fixture should yield at least one Test node"

for edge in tested_by:
assert edge.target in test_qualified, (
f"TESTED_BY edge has wrong direction: target={edge.target!r} "
f"is not a Test node from {test_file}. "
f"Expected target in {sorted(test_qualified)}. "
f"Edge: kind={edge.kind} source={edge.source} target={edge.target}"
)
assert edge.source not in test_qualified, (
f"TESTED_BY edge points test → test: "
f"{edge.source} → {edge.target}"
)

def test_recursion_depth_guard(self):
"""Parser should not crash on deeply nested code."""
# Generate Python code with many nested functions (> _MAX_AST_DEPTH)
Expand Down
66 changes: 66 additions & 0 deletions tests/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,72 @@ def test_validate_repo_root_error_mentions_svn_marker(self, tmp_path):
_validate_repo_root(tmp_path)


class TestQueryGraphTestsFor:
"""Regression tests for #515: query_graph(pattern='tests_for')
must follow direct TESTED_BY edges (source=production, target=test)
rather than relying on the naming-convention fallback.
"""

def setup_method(self, tmp_path_factory=None):
# tmp_path_factory is unavailable in setup_method; use a TemporaryDirectory.
import tempfile as _tempfile
self._tmpdir = _tempfile.TemporaryDirectory()
self.repo_root = Path(self._tmpdir.name)
# _validate_repo_root requires .git or .code-review-graph.
(self.repo_root / ".code-review-graph").mkdir()
# find_project_root / get_db_path look here for the DB.
from code_review_graph.incremental import get_db_path
self.db_path = get_db_path(self.repo_root)
self.store = GraphStore(str(self.db_path))
self._seed_graph()

def teardown_method(self):
self.store.close()
self._tmpdir.cleanup()

def _seed_graph(self):
# Production function with an unconventional name so the
# naming-convention fallback (test_<name> / Test<name>) cannot match.
self.store.upsert_node(NodeInfo(
kind="File", name="/src/calc.py", file_path="/src/calc.py",
line_start=1, line_end=20, language="python",
))
self.store.upsert_node(NodeInfo(
kind="Function", name="combine", file_path="/src/calc.py",
line_start=1, line_end=5, language="python",
))
self.store.upsert_node(NodeInfo(
kind="File", name="/tests/spec.py", file_path="/tests/spec.py",
line_start=1, line_end=20, language="python",
))
self.store.upsert_node(NodeInfo(
kind="Test", name="verify_combine_behaviour",
file_path="/tests/spec.py",
line_start=1, line_end=5, language="python", is_test=True,
))
# Parser-canonical direction: source=production, target=test.
self.store.upsert_edge(EdgeInfo(
kind="TESTED_BY",
source="/src/calc.py::combine",
target="/tests/spec.py::verify_combine_behaviour",
file_path="/tests/spec.py", line=1,
))
self.store.commit()
# Release the writer connection so query_graph can open its own.
self.store.close()

def test_query_graph_tests_for_finds_direct_edge(self):
from code_review_graph.tools import query_graph
result = query_graph(
pattern="tests_for",
target="/src/calc.py::combine",
repo_root=str(self.repo_root),
)
assert result["status"] == "ok"
qns = {r["qualified_name"] for r in result["results"]}
assert "/tests/spec.py::verify_combine_behaviour" in qns


class TestGetDocsSection:
"""Tests for the get_docs_section tool."""

Expand Down
Loading