Skip to content

Extraction worker#3

Open
itssubhodiproy wants to merge 4 commits into
mainfrom
ex-worker
Open

Extraction worker#3
itssubhodiproy wants to merge 4 commits into
mainfrom
ex-worker

Conversation

@itssubhodiproy

Copy link
Copy Markdown
Owner

No description provided.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Extraction Worker service and the corresponding API Server internal endpoints to support table-level extraction runs (manifest fetch, chunk vector search, bulk cell updates) and real-time progress via Redis pub/sub.

Changes:

  • Enabled ext tasks at the repo root and added the Extraction Worker service scaffolding (uv/pyproject, Taskfile, worker runtime).
  • Implemented the Extraction Worker pipeline: embedding → pgvector chunk search (API) → rerank → prompt/LLM structured output → validation → bulk update + Redis events.
  • Added API Server extraction endpoints (/tables/{id}/run, /cells/{cell_id}/rerun, /extraction-manifest, /cells/bulk-update) and a document chunk vector search endpoint (/documents/{id}/chunk-search), plus updated docs.

Reviewed changes

Copilot reviewed 34 out of 37 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
Taskfile.yml Enables ext task include for the new service
services/extraction-worker/uv.lock Locks Extraction Worker Python dependencies
services/extraction-worker/Taskfile.yml Adds dev/build/test/lint/format tasks for extraction worker
services/extraction-worker/pyproject.toml Defines extraction worker package + runtime requirements
services/extraction-worker/app/types.py Worker-side task/event + API payload models
services/extraction-worker/app/rag/reranker.py Cohere reranker integration
services/extraction-worker/app/rag/pipeline.py Builds RAG context from chunk search + rerank
services/extraction-worker/app/main.py Worker entrypoint
services/extraction-worker/app/llm/response_parser.py Logical validation of structured LLM output
services/extraction-worker/app/llm/prompts.py System/user prompt construction + type hints
services/extraction-worker/app/llm/client.py OpenAI structured-output client abstraction
services/extraction-worker/app/handler.py Table-run orchestration, concurrency, retries, bulk updates, events
services/extraction-worker/app/embedding/client.py Cohere embedding client
services/extraction-worker/app/consumer.py Redis BRPOP consumer loop + retry handling
services/extraction-worker/app/config.py Worker settings/env configuration
services/extraction-worker/app/clients/redis_client.py Queue + pub/sub publishing client
services/extraction-worker/app/clients/api_client.py Internal API client for manifest/search/bulk-update
services/api-server/app/utils/redis_tasks.py Adds Redis enqueue function for extraction runs
services/api-server/app/services/extraction_service.py Adds extraction run, manifest, bulk update, and rerun logic
services/api-server/app/services/document_service.py Adds embedding-based chunk search implementation
services/api-server/app/schemas/extraction.py Adds extraction request/response schemas
services/api-server/app/schemas/document.py Adds chunk-search request/response schemas
services/api-server/app/schemas/cell.py Adds shared CellStatusType literal
services/api-server/app/routes/extraction.py Adds extraction API routes
services/api-server/app/routes/documents.py Adds internal chunk-search endpoint
services/api-server/app/main.py Registers extraction router
docs/TODO.md Adds roadmap notes
docs/services/extraction-worker.md Updates extraction worker design doc to table-level trigger model
docs/services/api-server.md Updates API server design doc for extraction changes
docs/product.md Updates user journey/progress UI description
docs/infrastructure.md Updates infra docs for new queue/task semantics
docs/frontend.md Updates frontend docs for progress bar
docs/architecture.md Updates extraction flow to table-level trigger model
docs/api.md Updates API docs for chunk-search + extraction endpoints
.env.example Adds OPENAI_BASE_URL example

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +8 to +12
TaskType = Literal["run_all", "run_rerun"]


class ExtractionTask(BaseModel):
table_id: str
Comment on lines +122 to +124
if retryable_failures and task.retry_count < settings.MAX_RETRIES:
await self._enqueue_retryable_failures(task, retryable_failures)
return
Comment on lines +272 to +291
retry_tasks = 0

for failure in failures:
if failure.cell is None:
continue

retry_tasks += 1
await self._deps.redis.enqueue_extraction_task(
ExtractionTask(
table_id=task.table_id,
type=task.type,
cell_id=failure.cell.cell_id,
retry_count=retry_count,
)
)

logging.info(
"Retrying %d extraction cell(s) for table %s (attempt %d/%d)",
retry_tasks,
task.table_id,
raise LookupError("Cell not found")
# Enqueue before commit to prevent stuck 'extracting' status on queue failure
cell.status = "extracting"
await enqueue_extraction_run(self.redis, table_id=table.id, type="rerun_cell", cell_id=cell.id)
Comment thread docs/api.md
Comment on lines +603 to +611
Request: [
{
"cell_id": "cell_xyz",
"status": "completed",
"answer": "12 months fees",
"reasoning": "...",
"source_references": [...]
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants