diff --git a/llms.txt b/llms.txt new file mode 100644 index 0000000..e920f31 --- /dev/null +++ b/llms.txt @@ -0,0 +1,421 @@ +# go-taskflow API Reference for Code Agents + +## Project Overview + +**go-taskflow** is a general-purpose task-parallel programming framework for Go, inspired by taskflow-cpp. It provides a DAG-based approach to define and execute concurrent tasks with complex dependencies. + +**Key Features:** +- Native Go concurrency model using goroutines +- Static, Subflow, Conditional, and Cyclic task types +- Priority-based task scheduling +- Built-in visualization (DOT format) and profiling (flamegraph) +- High extensibility for various use cases + +**Package Import:** `import gtf "github.com/noneback/go-taskflow"` + +--- + +## Core Components + +### 1. TaskFlow + +The main container for organizing tasks into a directed acyclic graph (DAG). + +```go +// Create a new TaskFlow +tf := gtf.NewTaskFlow("flow-name") + +// Reset the TaskFlow (unfreeze it) +tf.Reset() + +// Get the flow name +name := tf.Name() + +// Export visualization in DOT format +err := tf.Dump(os.Stdout) +``` + +--- + +### 2. Task + +The basic unit of work in a TaskFlow. Tasks can have dependencies and priorities. + +```go +// Create tasks +task := tf.NewTask("task-name", func() { + // task logic here +}) + +// Set task priority (HIGH, NORMAL, LOW) +task.Priority(gtf.HIGH) + +// Get task name +name := task.Name() +``` + +#### Task Dependency Methods + +```go +// Task A must complete before Task B (A -> B) +taskA.Precede(taskB) + +// Task B depends on Task A (same as above, reversed syntax) +taskB.Succeed(taskA) + +// Multiple dependencies +taskD.Succeed(taskA, taskB, taskC) // D runs after A, B, C complete +taskA.Precede(taskB, taskC, taskD) // A must complete before B, C, D +``` + +#### Task Priorities + +```go +gtf.HIGH // Highest priority +gtf.NORMAL // Default priority +gtf.LOW // Lowest priority +``` + +--- + +### 3. Executor + +Schedules and executes TaskFlows with configurable concurrency. + +```go +// Create executor with max goroutine concurrency +// MUST be larger than number of subflows, recommend > runtime.NumCPU() +executor := gtf.NewExecutor(1000) + +// Run a TaskFlow +executor.Run(tf) + +// Wait for all tasks to complete +executor.Wait() + +// Run and wait (common pattern) +executor.Run(tf).Wait() + +// Export profiling data in flamegraph format +err := executor.Profile(os.Stdout) +``` + +--- + +### 4. Task Types + +#### Static Task +A simple task that executes a function once. + +```go +task := tf.NewTask("static-task", func() { + fmt.Println("Executing static task") +}) +``` + +#### Subflow Task +A nested TaskFlow that can contain its own tasks and dependencies. + +```go +subflowTask := tf.NewSubflow("subflow-name", func(sf *gtf.Subflow) { + // Create tasks inside the subflow + innerTask := sf.NewTask("inner", func() { + fmt.Println("Inside subflow") + }) + + // Subflows can also contain nested subflows and conditions + nestedSubflow := sf.NewSubflow("nested", func(nested *gtf.Subflow) { + // ... + }) +}) +``` + +#### Condition Task +A task that returns a uint value to determine which successor to execute (branching logic). + +```go +// Create condition task +cond := tf.NewCondition("condition", func() uint { + if someCondition { + return 0 // Execute first successor + } + return 1 // Execute second successor +}) + +// Define branches - order matters! Index corresponds to return value +cond.Precede(branchTask0, branchTask1) // 0 -> branchTask0, 1 -> branchTask1 +``` + +**Important:** Condition tasks participate in both conditional control and looping. The return value must be less than the number of successors. + +--- + +## Usage Patterns + +### Basic Parallel Tasks + +```go +tf := gtf.NewTaskFlow("parallel-example") + +task1 := tf.NewTask("task1", func() { /* ... */ }) +task2 := tf.NewTask("task2", func() { /* ... */ }) +task3 := tf.NewTask("task3", func() { /* ... */ }) + +// All tasks run in parallel (no dependencies) +executor := gtf.NewExecutor(10) +executor.Run(tf).Wait() +``` + +### Sequential Dependencies + +```go +tf := gtf.NewTaskFlow("sequential") + +init := tf.NewTask("init", func() { /* ... */ }) +process := tf.NewTask("process", func() { /* ... */ }) +cleanup := tf.NewTask("cleanup", func() { /* ... */ }) + +// Sequential: init -> process -> cleanup +init.Precede(process) +process.Precede(cleanup) + +executor.Run(tf).Wait() +``` + +### Parallel Merge Pattern + +```go +tf := gtf.NewTaskFlow("merge-pattern") + +// Create parallel tasks +tasks := make([]*gtf.Task, 10) +for i := 0; i < 10; i++ { + tasks[i] = tf.NewTask(fmt.Sprintf("worker-%d", i), func() { + // Parallel work + }) +} + +// Final task waits for all workers +final := tf.NewTask("final", func() { + // Merge results +}) +final.Succeed(tasks...) // final depends on all tasks +``` + +### Conditional Branching + +```go +tf := gtf.NewTaskFlow("conditional") + +start := tf.NewTask("start", func() { /* ... */ }) + +cond := tf.NewCondition("check", func() uint { + if value > 100 { + return 0 // Take first branch + } + return 1 // Take second branch +}) + +branchA := tf.NewTask("branch-A", func() { /* ... */ }) +branchB := tf.NewTask("branch-B", func() { /* ... */ }) + +start.Precede(cond) +cond.Precede(branchA, branchB) // 0 -> A, 1 -> B +``` + +### Nested Subflows + +```go +tf := gtf.NewTaskFlow("main") + +// Main flow task +setup := tf.NewTask("setup", func() { /* ... */ }) + +// Subflow with its own internal structure +subflow := tf.NewSubflow("processing", func(sf *gtf.Subflow) { + step1 := sf.NewTask("step1", func() { /* ... */ }) + step2 := sf.NewTask("step2", func() { /* ... */ }) + step1.Precede(step2) +}) + +teardown := tf.NewTask("teardown", func() { /* ... */ }) + +setup.Precede(subflow) +subflow.Precede(teardown) +``` + +--- + +## Error Handling + +### Panic Behavior +- Unrecovered panics cancel the entire parent graph +- Remaining tasks are left incomplete +- Framework logs panic with stack trace + +### Manual Panic Handling + +```go +tf.NewTask("safe-task", func() { + defer func() { + if r := recover(); r != nil { + // Handle panic without canceling graph + log.Printf("Handled panic: %v", r) + } + }() + // Task logic +}) +``` + +--- + +## Visualization & Profiling + +### Generate DOT Graph + +```go +// Write DOT format to stdout or file +if err := tf.Dump(os.Stdout); err != nil { + log.Fatal(err) +} + +// Convert to image using graphviz: +// dot -Tsvg output.dot > graph.svg +``` + +### Generate Flamegraph Profile + +```go + +// Write flamegraph data +if err := executor.Profile(os.Stdout); err != nil { + log.Fatal(err) +} + +// Convert to flamegraph SVG using flamegraph.pl: +// cat profile.txt | flamegraph.pl > profile.svg +``` + +--- + +## Important Notes for Code Agents + +### Thread Safety +- Tasks may execute concurrently +- Use synchronization primitives (mutex, channels) for shared state +- Example with mutex: + +```go +var mu sync.Mutex +var results []int + +tf.NewTask("writer", func() { + mu.Lock() + defer mu.Unlock() + results = append(results, value) +}) +``` + +### Executor Concurrency +- Concurrency parameter must be > number of subflows +- Recommended: value larger than `runtime.NumCPU()` +- Too low concurrency can cause deadlocks with nested subflows + +### TaskFlow Lifecycle +1. Create TaskFlow: `NewTaskFlow(name)` +2. Create tasks and define dependencies +3. Create Executor: `NewExecutor(concurrency)` +4. Run: `executor.Run(tf).Wait()` +5. Optionally: `Dump()` for visualization, `Profile()` for profiling +6. Optionally: `Reset()` to reuse TaskFlow + +### Condition Task Gotchas +- Return value must be < number of successors +- Successor order in `Precede()` determines branch mapping +- Index 0 = first successor, Index 1 = second successor, etc. + +### Memory Management +- Tasks hold references to their graph +- Large numbers of tasks may retain significant memory +- Call `Reset()` to reuse TaskFlow and free intermediate state + +--- + +## Complete Example + +```go +package main + +import ( + "fmt" + "log" + "os" + "sync" + + gtf "github.com/noneback/go-taskflow" +) + +func main() { + var mu sync.Mutex + var results []int + + tf := gtf.NewTaskFlow("example") + + // Setup task + setup := tf.NewTask("setup", func() { + fmt.Println("Setting up...") + }) + + // Parallel workers + workers := make([]*gtf.Task, 5) + for i := 0; i < 5; i++ { + workers[i] = tf.NewTask(fmt.Sprintf("worker-%d", i), func() { + mu.Lock() + results = append(results, i) + mu.Unlock() + }).Priority(gtf.NORMAL) + } + + // All workers depend on setup + for _, w := range workers { + w.Succeed(setup) + } + + // Final aggregation + final := tf.NewTask("final", func() { + fmt.Printf("Results: %v\n", results) + }) + final.Succeed(workers...) + + // Execute + executor := gtf.NewExecutor(10) + executor.Run(tf).Wait() + + // Export artifacts + tf.Dump(os.Stdout) + executor.Profile(os.Stdout) +} +``` + +--- + +## Use Cases + +1. **Data Pipelines**: Orchestrate data processing stages with complex dependencies +2. **AI Agent Workflows**: Define sequences with clear dependency structures +3. **Parallel Graph Processing**: Execute graph algorithms concurrently +4. **Build Systems**: Model build dependencies and parallel compilation +5. **Test Orchestration**: Run test suites with setup/teardown dependencies + +--- + +## Related Files + +- `taskflow.go` - TaskFlow container and lifecycle +- `task.go` - Task definition and dependency methods +- `executor.go` - Scheduling and execution logic +- `flow.go` - Task type builders (Static, Subflow, Condition) +- `graph.go` - Internal graph structure +- `node.go` - Node representation +- `visualizer.go` - DOT format export +- `profiler.go` - Profiling and flamegraph export