Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions src/lore/cli/commands/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,67 @@ def cmd_relationships(args: argparse.Namespace) -> None:


def cmd_graph_backfill(args: argparse.Namespace) -> None:
"""Run extraction on memories that don't yet have entity_mentions.

Calls the server-side ``POST /v1/graph/backfill`` endpoint added in
PR B. The pre-PR-B SDK-side path silently returned 0 in HTTP-store
mode because ``Lore._knowledge_graph_enabled`` is only set up for
local-Sqlite SDK construction; the new endpoint uses the same
extraction service the create-time hook does, so this command now
actually populates the graph.
"""
from lore import Lore

lore = Lore(knowledge_graph=True)
count = lore.graph_backfill(project=args.project, limit=args.limit)
lore = Lore()
store = getattr(lore, "_store", None)
request_fn = getattr(store, "_request", None)
if request_fn is None:
lore.close()
print(
"graph-backfill requires the HTTP backend. Set LORE_API_URL "
"and LORE_API_KEY (or run `lore setup`).",
file=sys.stderr,
)
sys.exit(1)

body: dict = {"limit": min(args.limit, 100)}
if getattr(args, "project", None):
body["project"] = args.project

total_processed = 0
total_failed = 0
pages = 0
while True:
resp = request_fn("POST", "/v1/graph/backfill", json=body)
data = resp.json() if resp.content else {}
if resp.status_code != 200:
lore.close()
msg = data.get("message") or data.get("detail") or data.get("error") or "?"
print(f"backfill failed: {resp.status_code}: {msg}", file=sys.stderr)
sys.exit(1)
if not data.get("enabled", True):
lore.close()
print(
"graph extraction is disabled "
"(set LORE_GRAPH_EXTRACTION_ENABLED=true or install `claude`).",
file=sys.stderr,
)
return
page_processed = int(data.get("processed", 0))
page_failed = int(data.get("failed", 0))
total_processed += page_processed
total_failed += page_failed
pages += 1
# Drain until the server reports nothing more to do, but cap
# pages so a runaway loop is contained.
if page_processed + page_failed == 0 or pages >= 50:
break

lore.close()
print(f"Processed {count} memory(ies) into the knowledge graph.")
print(
f"Backfill complete: processed={total_processed} "
f"failed={total_failed} pages={pages}."
)


def cmd_topics(args) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/lore/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from lore.server.routes.conversations import router as conversations_router
from lore.server.routes.export import router as export_router
from lore.server.routes.graph import router as graph_router
from lore.server.routes.graph_backfill import router as graph_backfill_router
from lore.server.routes.ingest import router as ingest_router
from lore.server.routes.keys import router as keys_router
from lore.server.routes.lessons import router as lessons_router
Expand Down Expand Up @@ -177,6 +178,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
app.include_router(snapshots_router)
app.include_router(export_router)
app.include_router(graph_router)
app.include_router(graph_backfill_router)
app.include_router(review_router)
app.include_router(topics_router)
app.include_router(setup_validation_router)
Expand Down
173 changes: 173 additions & 0 deletions src/lore/server/routes/graph_backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Graph endpoints — backfill entity / mention / relationship rows.

The create-time fire-and-forget hook in ``routes/memories.py`` and
``routes/observations.py`` covers new memories. This route covers the
historical case: memories already in the DB without ``entity_mentions``,
typically because they pre-date PR B's wiring or were created while
``LORE_GRAPH_EXTRACTION_ENABLED=false``.

Behavior:

* Default: walk memories with no rows in ``entity_mentions``, run
extraction on each, return per-result counts.
* ``force=true``: re-run extraction on every memory (up to ``limit``)
regardless of whether it already has mentions. Useful after a model
upgrade or prompt revision.
* Synchronous request shape with a small ``limit`` cap so a single
HTTP call always finishes; large backfills repeat the request.

Intentionally *not* implemented as a background job — that would need
queueing infra we don't have. Repeated calls with limit=N walk the
remaining rows; the LEFT JOIN on entity_mentions naturally skips
already-processed memories on the next call.
"""

from __future__ import annotations

import asyncio
import logging
from typing import Optional

try:
from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field
except ImportError:
raise ImportError("FastAPI is required. Install with: pip install lore-sdk[server]")

from lore.persistence.protocol import Store
from lore.server.auth import AuthContext, require_role
from lore.server.db import get_store
from lore.services import graph_extraction as graph_svc

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/v1/graph", tags=["graph"])

# Backfill cap: a single HTTP call processes at most this many memories.
# Larger backfills are run by repeating the request — the LEFT JOIN
# naturally skips already-processed rows on subsequent calls. Cap is
# generous enough that ``lore graph-backfill`` finishes on a typical
# session-buffer-sized run in one or two HTTP round trips, but small
# enough that no individual request runs unbounded under
# extraction-concurrency=2.
_MAX_BACKFILL_LIMIT = 100


class BackfillRequest(BaseModel):
limit: int = Field(50, ge=1, le=_MAX_BACKFILL_LIMIT)
force: bool = False
project: Optional[str] = None


class BackfillResultItem(BaseModel):
memory_id: str
entities_inserted: int
entities_reused: int
mentions_inserted: int
relationships_inserted: int
error: Optional[str] = None


class BackfillResponse(BaseModel):
processed: int
failed: int
results: list[BackfillResultItem]
enabled: bool


@router.post("/backfill", response_model=BackfillResponse)
async def backfill_graph(
body: BackfillRequest,
auth: AuthContext = Depends(require_role("writer", "admin")),
store: Store = Depends(get_store),
) -> BackfillResponse:
"""Run graph extraction on memories that don't yet have mentions.

Pass ``force=true`` to re-extract memories that already have mentions
(use after a prompt / model change).
"""
if not graph_svc.is_enabled():
return BackfillResponse(
processed=0, failed=0, results=[], enabled=False,
)

if body.force:
# Re-extract regardless of existing mentions. Walks all memories
# for the org/project, capped at ``limit``. We use the same
# "without mentions" path with a wider net by clearing the
# filter — but the store doesn't expose that, so for v1 we
# simply pull from list_memories_without_mentions when force
# is false, and from a generic list when force is true. The
# store layer's existing ``list`` shape is good enough.
memories = await _list_for_force(
store, org_id=auth.org_id, project=body.project, limit=body.limit,
)
else:
memories = await store.list_memories_without_mentions(
auth.org_id, project=body.project, limit=body.limit,
)

if not memories:
return BackfillResponse(processed=0, failed=0, results=[], enabled=True)

# Run extraction concurrently. The service-level semaphore caps
# actual subprocess fan-out so we can fire all of them at once
# without flooding the host.
async def run_one(mem):
return await graph_svc.extract_and_persist(
store, org_id=auth.org_id, memory_id=mem.id,
content=mem.content, context=mem.context,
)

results = await asyncio.gather(
*(run_one(m) for m in memories),
return_exceptions=True,
)

items: list[BackfillResultItem] = []
failed = 0
processed = 0
for mem, r in zip(memories, results):
if isinstance(r, BaseException):
logger.warning("graph backfill task crashed for %s: %r", mem.id, r)
failed += 1
items.append(BackfillResultItem(
memory_id=mem.id,
entities_inserted=0, entities_reused=0,
mentions_inserted=0, relationships_inserted=0,
error=f"task crashed: {r!r}",
))
continue
if r.error:
failed += 1
else:
processed += 1
items.append(BackfillResultItem(
memory_id=r.memory_id,
entities_inserted=r.entities_inserted,
entities_reused=r.entities_reused,
mentions_inserted=r.mentions_inserted,
relationships_inserted=r.relationships_inserted,
error=r.error,
))

return BackfillResponse(
processed=processed, failed=failed,
results=items, enabled=True,
)


async def _list_for_force(
store: Store, *, org_id: str, project: Optional[str], limit: int,
):
"""Pull memories regardless of existing mentions (force=true path).

The persistence layer doesn't have a direct "list everything" hook
that respects org_id; ``list_memories`` on the service layer does.
Using it here keeps PR B from adding yet another store method.
"""
from lore.services import memories as mem_svc

return await mem_svc.list_memories(
store, org_id=org_id, project=project, limit=limit,
)
12 changes: 12 additions & 0 deletions src/lore/server/routes/memories.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ async def create_memory(
store, memory_id=stored.id, content=stored.content, context=stored.context,
))

# Fire-and-forget graph extraction. Auto-on iff `claude` is on PATH;
# explicit override via LORE_GRAPH_EXTRACTION_ENABLED. The semaphore
# inside the service caps concurrency so a burst of creates doesn't
# spawn unbounded subprocesses.
from lore.services import graph_extraction as graph_svc

if graph_svc.is_enabled():
asyncio.create_task(graph_svc.extract_and_persist(
store, org_id=auth.org_id, memory_id=stored.id,
content=stored.content, context=stored.context,
))

return MemoryCreateResponse(id=stored.id)


Expand Down
13 changes: 13 additions & 0 deletions src/lore/server/routes/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ async def _embed(text: str):
scope=body.scope,
)
stored = await _create_observation(store, obs, _embed)

# Fire-and-forget graph extraction. Observations are the bulk-write
# tier the dream/capture subagents save through; without this the
# graph stays empty even when `claude` is installed (PR B / spec
# 2026-05-08-lore-graph-population-design.md).
from lore.services import graph_extraction as graph_svc

if graph_svc.is_enabled():
asyncio.create_task(graph_svc.extract_and_persist(
store, org_id=auth.org_id, memory_id=stored.id,
content=stored.content, context=stored.context,
))

return ObservationCreateResponse(id=stored.id)


Expand Down
8 changes: 7 additions & 1 deletion tests/test_enrichment_memories.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ async def fake_get_store():
with patch("lore.server.routes.memories.get_store", fake_get_store):
with patch("lore.server.routes.memories.require_role", return_value=lambda: mock_auth):
with patch("lore.server.routes.retrieve._get_embedder", return_value=mock_embedder):
yield TestClient(app)
# PR B (graph-extraction wiring) adds a second
# asyncio.create_task in the create handler. Force it
# off here so the existing ``assert_called_once`` checks
# in this file stay deterministic regardless of whether
# ``claude`` is on PATH in the test environment.
with patch.dict("os.environ", {"LORE_GRAPH_EXTRACTION_ENABLED": "false"}):
yield TestClient(app)


class TestEnrichmentTrigger:
Expand Down
Loading
Loading