Skip to content

Feature/bus job chain, Allows adding multiple tasks to be executed sequentially.#1082

Open
seth-shi wants to merge 5 commits intohibiken:masterfrom
seth-shi:feature/bus-chain
Open

Feature/bus job chain, Allows adding multiple tasks to be executed sequentially.#1082
seth-shi wants to merge 5 commits intohibiken:masterfrom
seth-shi:feature/bus-chain

Conversation

@seth-shi
Copy link
Copy Markdown

Add Chain Task Support

Summary

Add NewChainTask() for sequential task execution. Tasks run in order, chain stops on failure.

Usage

// Create chain
chain := asynq.NewChainTask(
    asynq.NewTask("task1", payload1),
    asynq.NewTask("task2", payload2),
    asynq.NewTask("task3", payload3),
)

// Enqueue like normal task
client.Enqueue(chain)

// Handlers - no special logic needed
mux.HandleFunc("task1", func(ctx context.Context, t *asynq.Task) error {
    // ... process task1
    return nil  // success -> task2 runs
})

Features

  • ✅ Tasks execute sequentially
  • ✅ Stops on first failure
  • ✅ Supports all options (Queue, MaxRetry, Timeout, etc.)
  • ✅ Cross-queue support
  • ✅ Zero breaking changes
  • ✅ 18 tests, 100% pass rate

Files

  • chain.go (256 lines) - Core implementation
  • chain_test.go (1,135 lines) - Test suite
  • server.go - Register middleware

seth-shi added 3 commits October 25, 2025 13:56
…andling

- Updated task timeout comment for clarity in `chain_test.go`.
- Improved task verification by replacing `LPop` with `broker.Dequeue` for better reliability in multiple test cases.
- Increased wait time for task completion in `TestChainIntegration_EndToEnd` to ensure reliability.
- Added checks for task options and deadlines in chain tasks to ensure correct behavior.
@seth-shi seth-shi changed the title Feature/bus chain Feature/bus job chain, Allows adding multiple tasks to be executed sequentially. Oct 25, 2025
seth-shi added 2 commits October 26, 2025 10:01
- Introduced `TestChainWithDeadlineOption` to verify that the Deadline option is preserved during chain execution.
- Added `TestChainWithProcessAtOption` to ensure the ProcessAt option is correctly handled and scheduled.
- Updated comments in `chain.go` to clarify JSON marshaling behavior for time-related options.
@seth-shi
Copy link
Copy Markdown
Author

Run these three commands. Note that these three commands can be run simultaneously.
job flow is:

  1. consumer1 start listen queue1
  2. consumer2 start listen queue2
  3. producer start (add 3 tasks)
  4. consumer1 execute task1
  5. consumer2 execute task2
  6. consumer3 execute task3
go run main.go producer 
========================================
✓ Chain task sent successfully!
========================================
  Task ID: ad7952a4-48f5-4faf-8a03-ce58890af366    
  Initial Queue: queue1
  Task Chain:
    1. task:1 → queue1
    7. task:2 → queue2
    8. task:3 → queue1
  Retention: 24h0m0s
  Tip: Completed tasks will appear in asynqmon's Completed tab
========================================

go run main.go consumer1
========================================
Consumer1 started [Queue: queue1]
========================================
Redis: localhost:6700
Concurrency: 5
Waiting for tasks...
Press Ctrl+C to stop
========================================
asynq: pid=28160 2025/10/26 03:12:39.014155 INFO: Starting processing
asynq: pid=28160 2025/10/26 03:12:39.014155 INFO: Send signal TERM or INT to terminate the process    

[Consumer1][queue1][11:13:53] Started: Task 1 (Type: task:1)
  [Consumer1][queue1][11:13:54] Task 1 processing... (1/3s)
  [Consumer1][queue1][11:13:55] Task 1 processing... (2/3s)
  [Consumer1][queue1][11:13:56] Task 1 processing... (3/3s)
[Consumer1][queue1][11:13:56] ✓ Completed: Task 1
----------------------------------------

[Consumer1][queue1][11:14:01] Started: Task 3 (Type: task:3)
  [Consumer1][queue1][11:14:02] Task 3 processing... (1/3s)
  [Consumer1][queue1][11:14:03] Task 3 processing... (2/3s)
  [Consumer1][queue1][11:14:04] Task 3 processing... (3/3s)
[Consumer1][queue1][11:14:04] ✓ Completed: Task 3
----------------------------------------

go run main.go consumer2
========================================
Consumer2 started [Queue: queue2]
========================================
Redis: localhost:6700
Concurrency: 5
Waiting for tasks...
Press Ctrl+C to stop
========================================
asynq: pid=3636 2025/10/26 03:12:43.256777 INFO: Starting processing
asynq: pid=3636 2025/10/26 03:12:43.256777 INFO: Send signal TERM or INT to terminate the process     

[Consumer2][queue2][11:13:57] Started: Task 2 (Type: task:2)
  [Consumer2][queue2][11:13:58] Task 2 processing... (1/3s)
  [Consumer2][queue2][11:13:59] Task 2 processing... (2/3s)
  [Consumer2][queue2][11:14:00] Task 2 processing... (3/3s)
[Consumer2][queue2][11:14:00] ✓ Completed: Task 2
----------------------------------------

this is a example code

// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/hibiken/asynq"
)

const (
	TypeTask1 = "task:1"
	TypeTask2 = "task:2"
	TypeTask3 = "task:3"
)

type TaskPayload struct {
	Name string `json:"name"`
}

var (
	redisAddr = "localhost:6700"
)

func main() {
	if len(os.Args) < 2 {
		os.Exit(1)
	}

	mode := os.Args[1]

	switch mode {
	case "producer":
		runProducer()
	case "consumer1":
		runConsumer("queue1", "Consumer1")
	case "consumer2":
		runConsumer("queue2", "Consumer2")
	default:
		fmt.Printf("Unknown mode: %s\n\n", mode)
		os.Exit(1)
	}
}

// ============================================
// Producer Mode
// ============================================

func runProducer() {
	fs := flag.NewFlagSet("producer", flag.ExitOnError)
	retention := fs.Duration("retention", 24*time.Hour, "Completed task retention time (e.g., 1h, 24h)")
	fs.Parse(os.Args[2:])

	// Create Redis client connection
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr: redisAddr,
	})
	defer client.Close()

	// Create payloads for three tasks
	payload1, _ := json.Marshal(TaskPayload{Name: "Task 1"})
	payload2, _ := json.Marshal(TaskPayload{Name: "Task 2"})
	payload3, _ := json.Marshal(TaskPayload{Name: "Task 3"})

	// Create three tasks with different queues
	// Use Retention to keep completed tasks visible in asynqmon
	var opts []asynq.Option
	if *retention > 0 {
		opts = append(opts, asynq.Retention(*retention))
	}

	task1 := asynq.NewTask(TypeTask1, payload1,
		append(opts, asynq.Queue("queue1"))...)
	task2 := asynq.NewTask(TypeTask2, payload2,
		append(opts, asynq.Queue("queue2"))...)
	task3 := asynq.NewTask(TypeTask3, payload3,
		append(opts, asynq.Queue("queue1"))...)

	// Create chain task
	chainTask := asynq.NewChainTask(task1, task2, task3)

	// Enqueue chain task
	info, err := client.Enqueue(chainTask)
	if err != nil {
		log.Fatalf("Failed to enqueue chain task: %v", err)
	}

	fmt.Println("========================================")
	fmt.Println("✓ Chain task sent successfully!")
	fmt.Println("========================================")
	fmt.Printf("  Task ID: %s\n", info.ID)
	fmt.Printf("  Initial Queue: %s\n", info.Queue)
	fmt.Printf("  Task Chain:\n")
	fmt.Printf("    1. %s → queue1\n", TypeTask1)
	fmt.Printf("    2. %s → queue2\n", TypeTask2)
	fmt.Printf("    3. %s → queue1\n", TypeTask3)
	if *retention > 0 {
		fmt.Printf("  Retention: %s\n", *retention)
		fmt.Println("  Tip: Completed tasks will appear in asynqmon's Completed tab")
	} else {
		fmt.Println("  Tip: Completed tasks will be deleted immediately")
		fmt.Println("       Use --retention=24h to keep completed tasks")
	}
	fmt.Println("========================================")
}

// ============================================
// Consumer Mode
// ============================================

func runConsumer(queueName, consumerName string) {
	fs := flag.NewFlagSet("consumer", flag.ExitOnError)
	concurrency := fs.Int("concurrency", 5, "Number of concurrent workers")
	fs.Parse(os.Args[2:])

	// Create Redis server configuration
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: redisAddr},
		asynq.Config{
			Concurrency: *concurrency,
			Queues: map[string]int{
				queueName: 10,
			},
		},
	)

	// Create task handler
	mux := asynq.NewServeMux()

	// Register handlers for all task types
	mux.HandleFunc(TypeTask1, makeHandler(queueName, consumerName))
	mux.HandleFunc(TypeTask2, makeHandler(queueName, consumerName))
	mux.HandleFunc(TypeTask3, makeHandler(queueName, consumerName))

	fmt.Println("========================================")
	fmt.Printf("%s started [Queue: %s]\n", consumerName, queueName)
	fmt.Println("========================================")
	fmt.Printf("Redis: %s\n", redisAddr)
	fmt.Printf("Concurrency: %d\n", *concurrency)
	fmt.Println("Waiting for tasks...")
	fmt.Println("Press Ctrl+C to stop")
	fmt.Println("========================================")

	// Start server to process tasks
	if err := srv.Run(mux); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

// makeHandler creates a handler function for specific queue
func makeHandler(queueName, consumerName string) func(context.Context, *asynq.Task) error {
	return func(ctx context.Context, t *asynq.Task) error {
		var payload TaskPayload
		if err := json.Unmarshal(t.Payload(), &payload); err != nil {
			return fmt.Errorf("failed to unmarshal payload: %w", err)
		}

		// Output task start info
		fmt.Printf("\n[%s][%s][%s] Started: %s (Type: %s)\n",
			consumerName,
			queueName,
			time.Now().Format("15:04:05"),
			payload.Name,
			t.Type())

		// Sleep for 3 seconds
		for i := 1; i <= 3; i++ {
			// Check if context is cancelled
			select {
			case <-ctx.Done():
				fmt.Printf("[%s][%s][%s] ⚠ Cancelled: %s\n",
					consumerName,
					queueName,
					time.Now().Format("15:04:05"),
					payload.Name)
				return ctx.Err()
			default:
			}

			time.Sleep(1 * time.Second)
			fmt.Printf("  [%s][%s][%s] %s processing... (%d/3s)\n",
				consumerName,
				queueName,
				time.Now().Format("15:04:05"),
				payload.Name,
				i)
		}

		// Output task completion info
		fmt.Printf("[%s][%s][%s] ✓ Completed: %s\n",
			consumerName,
			queueName,
			time.Now().Format("15:04:05"),
			payload.Name)
		fmt.Println("----------------------------------------")

		return nil
	}
}

@agorman
Copy link
Copy Markdown

agorman commented Oct 27, 2025

I need this!

return fmt.Errorf("asynq: server cannot run with nil handler")
}

// Automatically inject Chain Middleware
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this tbh. Might be worth considering this a candidate for the x folder.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not my changed,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't inject a middleware anywhere in the core. I would suggest leaving this up to the user. You could move this into the x/chain folder.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, i got it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have some question, can you give me some tips.

  • chain dependence root asynq module, but i will register chain middleware in asynq.Start
  • Task.w and Task.opts don't have export, i cannot use it in x folder (should i export these attribute or anther way)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a problem as well....Let me think about this.

@kamikazechaser kamikazechaser added pr-feat pr-alternative-available There is a workaround this issue with minimal work labels Nov 4, 2025
@codecov
Copy link
Copy Markdown

codecov bot commented Nov 4, 2025

Codecov Report

❌ Patch coverage is 89.25620% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.01%. Comparing base (4f00f52) to head (39859bd).
⚠️ Report is 34 commits behind head on master.

Files with missing lines Patch % Lines
chain.go 88.79% 8 Missing and 5 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1082      +/-   ##
==========================================
+ Coverage   67.13%   69.01%   +1.87%     
==========================================
  Files          29       30       +1     
  Lines        4300     5102     +802     
==========================================
+ Hits         2887     3521     +634     
- Misses       1135     1294     +159     
- Partials      278      287       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-alternative-available There is a workaround this issue with minimal work pr-feat

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants