Add memory hashing, versioning, and forget support#187
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces memory versioning and lineage tracking, including content hashing for duplicate detection and a new update flow that preserves history by marking old records as superseded. Review feedback highlights several critical areas: a hard limit on lineage retrieval that could impact data deletion, potential consistency issues allowing multiple 'current' versions, and opportunities to optimize network performance by reducing redundant data transmission. There is also a noted inconsistency between the hard deletion implementation and the soft deletion checks used during retrieval.
| for match in search_fn( | ||
| filters={"parent_memory_id": parent_memory_id}, | ||
| top_k=100, | ||
| ) or []: | ||
| ids.add(match.id) |
There was a problem hiding this comment.
The use of top_k=100 here creates a hard limit on the number of versions that can be 'forgotten' in a single lineage. If a memory has been updated more than 100 times, older versions will remain in the vector store, potentially violating privacy requirements or causing data leaks. Consider using a much larger top_k or implementing a paginated search if the underlying store supports it.
| ) | ||
| return await self._vector_add(op, domain, user_id) | ||
|
|
||
| self._mark_memory_superseded(op.embedding_id, previous, new_id) |
There was a problem hiding this comment.
There is a potential consistency issue here. If op.embedding_id refers to a version that is already superseded (e.g., due to a race condition or stale retrieval), this logic will mark that old version as superseded by the new one, but it won't affect the actual current version of the lineage. This results in multiple versions having is_current: True simultaneously. You should ensure that only one version per lineage is marked as current, perhaps by searching for the existing current version of the lineage before performing the update.
| def _set_parent_memory_id(self, memory_id: Optional[str]) -> None: | ||
| if not memory_id: | ||
| return | ||
| try: | ||
| self.vector_store.update( | ||
| id=memory_id, | ||
| metadata={"parent_memory_id": memory_id}, | ||
| ) | ||
| except Exception as exc: | ||
| logger.warning("Could not set parent_memory_id for %s: %s", memory_id, exc) |
There was a problem hiding this comment.
This method introduces an unnecessary network round-trip for every new memory added. Since BaseVectorStore.add allows providing explicit IDs, you can generate a UUID in the Weaver, include it in the initial metadata as parent_memory_id, and pass it to the add call. This would eliminate the need for a follow-up update call.
| self.vector_store.update( | ||
| id=memory_id, | ||
| text=previous.get("content"), | ||
| embedding=previous.get("embedding"), | ||
| metadata={ | ||
| "is_current": False, | ||
| "superseded_by": superseded_by, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Passing text and embedding back to the vector store during a metadata-only update is redundant and increases payload size. Most vector store providers (like Pinecone) allow updating metadata without re-sending the vector data. You should check if your BaseVectorStore implementation supports metadata-only updates and omit these fields if possible.
References
- Avoid redundant data transmission to external services to improve performance and reduce costs.
| async def _vector_delete(self, op: Operation) -> ExecutedOp: | ||
| success = self.vector_store.delete(ids=[op.embedding_id]) | ||
| ids = self._lineage_ids_for_forget(op.embedding_id) | ||
| success = self.vector_store.delete(ids=ids) |
There was a problem hiding this comment.
The DELETE operation performs a hard delete from the vector store, yet the retrieval logic in retrieval.py and judge.py explicitly checks for a forgotten_at metadata field. If the intention of 'forget support' is to allow for soft deletes or auditing, _vector_delete should probably update the lineage with a forgotten_at timestamp instead of calling delete(). If hard deletion is intended, the forgotten_at checks in the retrieval paths are dead code.
|
Addressed the Gemini review feedback in 5bc7006:
Tests: |
|
Hi @kodahhhhh can you please discuss in the issue thread about your approach |
|
| Filename | Overview |
|---|---|
| src/pipelines/weaver.py | Core change: adds SHA-256 content hashing, lineage versioning (ADD creates self-referential root; UPDATE writes a new vector and marks old versions superseded), and full-lineage forget on DELETE. Batch DELETE lost its try/except wrapper, which is a regression. |
| src/agents/judge.py | Adds _active_memory_results helper and applies it at all search return sites; search_by_text and search_by_metadata now over-fetch (top_k * 5) so enough active results survive the inactive filter. Logic looks correct. |
| src/pipelines/retrieval.py | Adds _is_active_memory filter to summary-domain and profile-catalog search loops; over-fetches (top_k * 2) then skips inactive entries. Clean and self-contained change. |
| tests/conftest.py | One-line fix: update now merges metadata instead of replacing it, correctly reflecting the partial-update contract needed by the new lineage marking logic. |
| tests/test_deterministic_memory_layer.py | Extends FakeVectorStore with get, partial-update in update, and optional ids in add; adds three focused tests for duplicate detection, versioned update, and full-lineage delete. |
| tests/integration/test_weaver_pipeline.py | Updates integration assertions to match the new version-per-UPDATE semantics, but replaces the standard import with a fragile importlib.exec_module path-based loader that will break on layout changes. |
Sequence Diagram
sequenceDiagram
participant C as Caller
participant W as Weaver.execute
participant FA as flush_add_batch
participant FD as flush_delete_batch
participant VS as VectorStore
C->>W: execute(JudgeResult)
W->>FA: flush_add_batch()
FA->>VS: search_by_metadata(content_hash)
VS-->>FA: matches
alt duplicate found
FA-->>W: OpStatus.SKIPPED
else new content
FA->>VS: add(texts, embeddings, ids, metadata)
VS-->>FA: stored_ids
FA-->>W: OpStatus.SUCCESS (new_id)
end
W->>VS: get(embedding_id)
VS-->>W: previous doc
W->>VS: search_by_metadata(parent_memory_id)
VS-->>W: lineage matches
alt content_hash unchanged
W-->>C: OpStatus.SKIPPED
else new content
W->>VS: add(new version vector)
W->>VS: "update(old ids, is_current=False)"
W-->>C: OpStatus.SUCCESS (new_id)
end
W->>FD: flush_delete_batch()
FD->>VS: get(embedding_id)
VS-->>FD: target doc
FD->>VS: search_by_metadata(parent_memory_id)
VS-->>FD: all versions
FD->>VS: delete(all lineage ids)
FD-->>W: OpStatus.SUCCESS / FAILED
W-->>C: WeaverResult
Comments Outside Diff (2)
-
src/pipelines/weaver.py, line 245-256 (link)Store-returned IDs discarded silently in batch ADD
idsis re-assigned with the return value fromvector_store.add, butsuccessful_ids(the pre-generated UUIDs) are used forExecutedOp.new_id. If a vector store implementation ignores the suppliedidsand generates its own, thenew_idrecorded in everyExecutedOpwill be wrong and subsequent UPDATE/DELETE operations referencing those IDs will fail. Consider using the store-returned IDs instead, or assert that they match. -
tests/integration/test_weaver_pipeline.py, line 565-569 (link)Fragile
importlib.exec_modulereplaces a working importThe original file used
from src.pipelines.weaver import Weaverand worked fine. The new approach hardcodes an OS path and re-executes the module outside the normal package system. If thesrc/layout changes, or if a CI runner runs tests from a different working directory,weaver_spec.loader.exec_module(weaver_module)will either fail to find the file or re-execute the module in an unexpected package context. There's no apparent benefit over the standard import.
Reviews (1): Last reviewed commit: "Address memory lineage review feedback" | Re-trigger Greptile
| if valid_ops: | ||
| loop = asyncio.get_running_loop() | ||
| try: | ||
| success = await loop.run_in_executor(None, partial(self.vector_store.delete, ids=ids_to_delete)) | ||
| status = OpStatus.SUCCESS if success else OpStatus.FAILED | ||
| for op in valid_ops: | ||
| executed_ops.append(ExecutedOp( | ||
| type=op.type, status=status, | ||
| embedding_id=op.embedding_id | ||
| )) | ||
| except Exception as exc: | ||
| logger.error("Vector batch DELETE failed: %s", exc) | ||
| for op in valid_ops: | ||
| executed_ops.append(ExecutedOp( | ||
| type=op.type, status=OpStatus.FAILED, | ||
| embedding_id=op.embedding_id, error=str(exc) | ||
| )) | ||
| for op in valid_ops: | ||
| executed_ops.append(await self._vector_delete(op)) | ||
|
|
||
| delete_batch_ops.clear() |
There was a problem hiding this comment.
Batch DELETE drops exception handling
The old flush_delete_batch wrapped the delete call in try/except and appended OpStatus.FAILED entries on error. The new per-op loop has no exception guard: if _vector_delete raises (e.g., vector_store.get or vector_store.delete throws), the exception escapes flush_delete_batch, the delete_batch_ops.clear() on line 291 is never reached, and the same ops may be reprocessed on the next call. Compare with flush_add_batch, which still has the try/except pattern.
| new_id = self._new_memory_id() | ||
| metadata = _memory_metadata( | ||
| op.content, | ||
| domain, | ||
| user_id, | ||
| parent_memory_id=new_id, | ||
| ) |
There was a problem hiding this comment.
Version 1 memories are their own
parent_memory_id
new_id is generated, placed in parent_memory_id, and then used as the record's own ID. This self-referential root is deliberate (it lets _lineage_matches find all versions of a memory), but adding a comment would prevent future maintainers from "fixing" it incorrectly.
| new_id = self._new_memory_id() | |
| metadata = _memory_metadata( | |
| op.content, | |
| domain, | |
| user_id, | |
| parent_memory_id=new_id, | |
| ) | |
| # For a brand-new memory, use the record's own ID as its parent_memory_id. | |
| # This self-referential root is the stable lineage key shared by all | |
| # future versions of this memory (see _lineage_matches / _lineage_ids_for_forget). | |
| new_id = self._new_memory_id() | |
| metadata = _memory_metadata( | |
| op.content, | |
| domain, | |
| user_id, | |
| parent_memory_id=new_id, | |
| ) |
Summary
parent_memory_id,version,is_current) and preserve prior versions by adding new current vectors on UPDATEFixes #166
Tests
.venv/bin/python -m pytest tests/test_deterministic_memory_layer.py tests/integration/test_weaver_pipeline.py -qgit diff --check.venv/bin/python -m compileall -q src/agents/judge.py src/pipelines/retrieval.py src/pipelines/weaver.py tests/test_deterministic_memory_layer.py tests/integration/test_weaver_pipeline.py