Skip to content

Queue Integration

Jean-Marc Strauven edited this page Aug 6, 2025 · 1 revision

⚑ Queue Integration

← Conditions & Branching | Home | Artisan Commands β†’

Laravel Flowpipe seamlessly integrates with Laravel's queue system for asynchronous workflow processing, enabling scalable and resilient applications.

🧠 Core Concept

Queue integration allows you to:

  • Execute workflows asynchronously
  • Process long-running workflows without blocking HTTP requests
  • Scale workflow processing across multiple workers
  • Retry failed workflows automatically
  • Schedule workflows for future execution

πŸš€ Basic Queue Usage

Execute Flow Asynchronously

use Grazulex\LaravelFlowpipe\Flowpipe;

// Queue for background processing
Flowpipe::fromYaml('user-onboarding')
    ->send($userData)
    ->dispatch(); // Queues the flow instead of executing immediately

// Queue with delay
Flowpipe::fromYaml('welcome-email')
    ->send($userData)
    ->delay(now()->addMinutes(30))
    ->dispatch();

// Queue on specific connection/queue
Flowpipe::fromYaml('data-processing')
    ->send($data)
    ->onQueue('heavy-processing')
    ->onConnection('redis')
    ->dispatch();

YAML Queue Configuration

flow: heavy-data-processing
description: CPU-intensive data processing workflow

# Queue configuration at flow level
queue:
  connection: redis
  queue: data-processing
  delay: 0
  timeout: 3600        # 1 hour timeout
  tries: 3             # Retry 3 times on failure
  backoff: [60, 300]   # Backoff delays in seconds

send:
  dataset_id: 12345
  processing_options:
    format: "json"
    compression: true

steps:
  - type: action
    name: validate-dataset
    class: App\Flowpipe\Steps\ValidateDatasetStep
    
  - type: action
    name: process-data
    class: App\Flowpipe\Steps\ProcessDataStep
    queue:
      timeout: 1800    # Step-specific timeout (30 minutes)
      
  - type: action
    name: generate-report
    class: App\Flowpipe\Steps\GenerateReportStep

πŸ”§ Advanced Queue Features

Step-Level Queueing

flow: mixed-execution-flow
description: Some steps run synchronously, others asynchronously

steps:
  # Runs immediately (synchronous)
  - type: action
    name: validate-input
    class: App\Flowpipe\Steps\ValidateInputStep
    
  # Queued for background processing
  - type: action
    name: heavy-processing
    class: App\Flowpipe\Steps\HeavyProcessingStep
    queue:
      connection: redis
      queue: heavy-tasks
      delay: 0
      
  # Runs immediately after heavy-processing completes
  - type: action
    name: finalize
    class: App\Flowpipe\Steps\FinalizeStep

Conditional Queueing

steps:
  - type: action
    name: process-order
    class: App\Flowpipe\Steps\ProcessOrderStep
    queue:
      # Only queue large orders
      when:
        field: order_total
        operator: greater_than
        value: 500
      connection: redis
      queue: large-orders
      priority: high

Queue Priorities

flow: priority-processing
description: Different priorities based on business rules

steps:
  - type: action
    name: process-urgent-request
    class: App\Flowpipe\Steps\ProcessUrgentStep
    queue:
      priority: high
      
  - type: action
    name: process-normal-request
    class: App\Flowpipe\Steps\ProcessNormalStep
    queue:
      priority: normal
      
  - type: action
    name: process-bulk-data
    class: App\Flowpipe\Steps\ProcessBulkStep
    queue:
      priority: low
      queue: bulk-processing

πŸ•’ Scheduled Workflows

Time-Based Scheduling

use Grazulex\LaravelFlowpipe\Flowpipe;

// Schedule for specific time
Flowpipe::fromYaml('daily-report')
    ->send($reportData)
    ->delay(now()->tomorrow()->setTime(9, 0)) // 9 AM tomorrow
    ->dispatch();

// Schedule relative to now
Flowpipe::fromYaml('reminder-email')
    ->send($reminderData)
    ->delay(now()->addDays(7)) // One week from now
    ->dispatch();

YAML Scheduling

flow: scheduled-maintenance
description: Scheduled system maintenance tasks

schedule:
  cron: "0 2 * * *"           # Every day at 2 AM
  timezone: "UTC"
  
send:
  maintenance_type: "daily"
  notify_admins: true

steps:
  - type: action
    name: cleanup-temp-files
    class: App\Flowpipe\Steps\CleanupTempFilesStep
    
  - type: action
    name: optimize-database
    class: App\Flowpipe\Steps\OptimizeDatabaseStep
    
  - type: action
    name: generate-health-report
    class: App\Flowpipe\Steps\GenerateHealthReportStep

Laravel Scheduler Integration

// In app/Console/Kernel.php
protected function schedule(Schedule $schedule)
{
    // Daily reports
    $schedule->call(function () {
        Flowpipe::fromYaml('daily-analytics-report')
            ->send(['date' => now()->subDay()->toDateString()])
            ->dispatch();
    })->daily();
    
    // Weekly cleanup
    $schedule->call(function () {
        Flowpipe::fromYaml('weekly-cleanup')
            ->send(['retention_days' => 90])
            ->dispatch();
    })->weekly();
}

πŸ”„ Long-Running Workflows

Chunked Processing

flow: bulk-user-import
description: Import large user datasets in chunks

send:
  file_path: "/imports/users.csv"
  chunk_size: 1000

steps:
  - type: action
    name: validate-file
    class: App\Flowpipe\Steps\ValidateImportFileStep
    
  - type: action
    name: chunk-file
    class: App\Flowpipe\Steps\ChunkFileStep
    
  - type: foreach
    name: process-chunks
    array: chunks
    queue:
      connection: redis
      queue: import-processing
      timeout: 600  # 10 minutes per chunk
    steps:
      - type: action
        name: process-chunk
        class: App\Flowpipe\Steps\ProcessUserChunkStep
      - type: action
        name: validate-chunk-results
        class: App\Flowpipe\Steps\ValidateChunkResultsStep
        
  - type: action
    name: consolidate-results
    class: App\Flowpipe\Steps\ConsolidateImportResultsStep

Progress Tracking

<?php

namespace App\Flowpipe\Steps;

use Closure;
use Grazulex\LaravelFlowpipe\Contracts\FlowStep;
use Illuminate\Support\Facades\Cache;

class TrackableProcessingStep implements FlowStep
{
    public function handle(mixed $payload, Closure $next): mixed
    {
        $jobId = $payload['job_id'];
        $totalItems = $payload['total_items'];
        
        foreach ($payload['items'] as $index => $item) {
            // Process item
            $this->processItem($item);
            
            // Update progress
            $progress = (($index + 1) / $totalItems) * 100;
            Cache::put("job_progress_{$jobId}", [
                'progress' => $progress,
                'processed' => $index + 1,
                'total' => $totalItems,
                'status' => 'processing'
            ], 3600);
        }
        
        Cache::put("job_progress_{$jobId}", [
            'progress' => 100,
            'processed' => $totalItems,
            'total' => $totalItems,
            'status' => 'completed'
        ], 3600);
        
        return $next($payload);
    }
    
    private function processItem($item): void
    {
        // Item processing logic
    }
}

🚨 Queue Error Handling

Retry Configuration

flow: resilient-api-processing
description: API processing with comprehensive retry logic

queue:
  tries: 5
  backoff: [30, 60, 300, 900, 1800]  # Exponential backoff
  timeout: 300
  max_exceptions: 3

steps:
  - type: action
    name: call-external-api
    class: App\Flowpipe\Steps\ExternalApiCallStep
    queue:
      # Step-specific retry configuration
      tries: 3
      backoff: [10, 30, 60]
      retry_until: 3600  # Keep retrying for 1 hour

Dead Letter Handling

<?php

namespace App\Flowpipe\Steps;

use Closure;
use Grazulex\LaravelFlowpipe\Contracts\FlowStep;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Mail;

class FailureHandlingStep implements FlowStep
{
    public function handle(mixed $payload, Closure $next): mixed
    {
        try {
            return $this->processPayload($payload, $next);
        } catch (\Exception $e) {
            $this->handleFailure($payload, $e);
            throw $e; // Re-throw to trigger queue retry
        }
    }
    
    private function handleFailure(array $payload, \Exception $e): void
    {
        // Log detailed error information
        Log::error('Flowpipe step failed', [
            'step' => static::class,
            'payload' => $payload,
            'error' => $e->getMessage(),
            'trace' => $e->getTraceAsString()
        ]);
        
        // Store for manual review
        FailedJob::create([
            'flow_name' => $payload['_flow_name'] ?? 'unknown',
            'step_name' => static::class,
            'payload' => json_encode($payload),
            'error_message' => $e->getMessage(),
            'failed_at' => now()
        ]);
        
        // Notify administrators for critical failures
        if ($this->isCriticalFailure($e)) {
            Mail::to(config('flowpipe.admin_email'))
                ->send(new CriticalFlowFailureMail($payload, $e));
        }
    }
}

πŸ“Š Real-World Examples

E-commerce Order Processing

flow: async-order-processing
description: Asynchronous order processing for high-volume e-commerce

queue:
  connection: redis
  queue: orders
  tries: 3
  timeout: 1800

send:
  order_id: "ORD-12345"
  customer_id: 67890
  items: [...]

steps:
  # Immediate validation (synchronous)
  - type: action
    name: validate-order
    class: App\Flowpipe\Steps\ValidateOrderStep
    
  # Inventory check (queued)
  - type: action
    name: reserve-inventory
    class: App\Flowpipe\Steps\ReserveInventoryStep
    queue:
      queue: inventory
      timeout: 300
      
  # Payment processing (high priority queue)
  - type: action
    name: process-payment
    class: App\Flowpipe\Steps\ProcessPaymentStep
    queue:
      queue: payments
      priority: high
      timeout: 120
      
  # Fulfillment (separate queue)
  - type: action
    name: create-shipment
    class: App\Flowpipe\Steps\CreateShipmentStep
    queue:
      queue: fulfillment
      delay: 300  # 5-minute delay for payment confirmation
      
  # Notifications (low priority)
  - type: action
    name: send-confirmation
    class: App\Flowpipe\Steps\SendConfirmationStep
    queue:
      queue: notifications
      priority: low

Content Processing Pipeline

flow: content-processing-pipeline
description: Asynchronous content processing with multiple stages

queue:
  connection: redis
  timeout: 3600

send:
  content_id: "CONTENT-789"
  content_type: "video"
  source_url: "https://example.com/video.mp4"

steps:
  # Download (high CPU/bandwidth)
  - type: action
    name: download-content
    class: App\Flowpipe\Steps\DownloadContentStep
    queue:
      queue: downloads
      timeout: 1800
      
  # Processing (GPU-intensive)
  - type: action
    name: process-video
    class: App\Flowpipe\Steps\ProcessVideoStep
    queue:
      queue: video-processing
      timeout: 3600
      connection: gpu-workers
      
  # Multiple format generation (parallel)
  - type: parallel
    name: generate-formats
    steps:
      - type: action
        name: generate-720p
        class: App\Flowpipe\Steps\Generate720pStep
        queue:
          queue: video-encoding
      - type: action
        name: generate-1080p
        class: App\Flowpipe\Steps\Generate1080pStep
        queue:
          queue: video-encoding
      - type: action
        name: generate-thumbnails
        class: App\Flowpipe\Steps\GenerateThumbnailsStep
        queue:
          queue: image-processing
          
  # Final steps (low priority)
  - type: action
    name: update-cdn
    class: App\Flowpipe\Steps\UpdateCdnStep
    queue:
      queue: cdn-updates
      priority: low

πŸ§ͺ Testing Queued Workflows

Testing Queue Integration

<?php

namespace Tests\Feature\Flowpipe;

use Tests\TestCase;
use Illuminate\Support\Facades\Queue;
use Grazulex\LaravelFlowpipe\Flowpipe;

class QueueIntegrationTest extends TestCase
{
    public function test_flow_can_be_dispatched_to_queue()
    {
        Queue::fake();
        
        Flowpipe::fromYaml('user-onboarding')
            ->send(['user_id' => 123])
            ->dispatch();
            
        Queue::assertPushed(FlowpipeJob::class, function ($job) {
            return $job->flowName === 'user-onboarding';
        });
    }
    
    public function test_queued_flow_processes_correctly()
    {
        $result = Flowpipe::fromYaml('user-onboarding')
            ->send(['user_id' => 123])
            ->executeSync(); // Force synchronous execution for testing
            
        $this->assertTrue($result->isSuccessful());
        $this->assertDatabaseHas('users', ['id' => 123, 'onboarded' => true]);
    }
}

Performance Testing

public function test_bulk_processing_performance()
{
    $startTime = microtime(true);
    
    // Process 1000 items
    for ($i = 0; $i < 1000; $i++) {
        Flowpipe::fromYaml('item-processing')
            ->send(['item_id' => $i])
            ->dispatch();
    }
    
    $dispatchTime = microtime(true) - $startTime;
    
    // Should dispatch quickly (under 1 second)
    $this->assertLessThan(1.0, $dispatchTime);
}

πŸ’‘ Best Practices

1. Choose Appropriate Queues

# Fast operations: default queue
- type: action
  class: SendEmailStep
  queue:
    queue: default

# Heavy operations: dedicated queue
- type: action
  class: ProcessVideoStep
  queue:
    queue: heavy-processing
    timeout: 3600

# Critical operations: high priority
- type: action
  class: ProcessPaymentStep
  queue:
    queue: payments
    priority: high

2. Handle Failures Gracefully

queue:
  tries: 3
  backoff: [60, 300, 900]  # 1min, 5min, 15min
  timeout: 1800

steps:
  - type: action
    class: CriticalStep
    queue:
      dead_letter_queue: failed-critical-tasks

3. Monitor Queue Health

// In a monitoring command
public function handle()
{
    $queueSizes = [
        'default' => Queue::size('default'),
        'heavy-processing' => Queue::size('heavy-processing'),
        'payments' => Queue::size('payments'),
    ];
    
    foreach ($queueSizes as $queue => $size) {
        if ($size > 1000) {
            // Alert operations team
            $this->alertOpsTeam("Queue {$queue} has {$size} pending jobs");
        }
    }
}

4. Use Appropriate Timeouts

# Quick API calls
timeout: 30

# File processing
timeout: 300

# Heavy computation
timeout: 3600

# Video processing
timeout: 7200

🎯 What's Next?

πŸš€ Laravel Flowpipe

🏠 Home

🏁 Getting Started

πŸ“š Core Concepts

πŸš€ Advanced Features

πŸ› οΈ Tools & Configuration

πŸ“– Examples


πŸ”— GitHub Repository

Clone this wiki locally