Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

README.md

Flow

This package provides a small orchestrator to run lightweight, database-backed flows on SurrealDB records. It exposes an Executor that registers flow handlers, selects eligible records, and prevents repeat processing by requiring handlers to mark their output field.

Core ideas

  • Flows are declarative. Use @executor.flow(table, stamp, dependencies, priority) to describe which table to watch, which field signals completion, required dependencies, and optional priority ordering. Flow definitions are stored in the flow table for observability.
  • Handlers do the work. Decorated functions receive a record dictionary and should perform side effects (e.g., creating related rows) and then update the configured output field so the record is not reprocessed.
  • Execution loops are flexible. Call execute_flows_once() to process any ready records one time, or await executor.run() to keep polling with exponential backoff until executor.stop() is called.

Example

from demo_unstruct_to_graph import flow
from kaig.db import DB

db = DB("mem://", "root", "root", "kaig", "demo")
executor = flow.Executor(db)

@executor.flow(table="document", stamp="flow_chunked", dependencies=["text"])
def chunk(record: flow.Record, flow: flow.Flow):
    _ = db.sync_conn.query(
        "CREATE chunk SET text = $text, document = $document RETURN NONE",
        {"text": record["text"], "document": record["id"]},
    )

results = executor.execute_flows_once()
# results => {"chunk": processed_count}

See flow/tests/flow_test.py for a complete chained example that first chunks new documents and then enriches the resulting chunks.