Skip to content

PyTrickle Core SDK Classes

John | Elite Encoder edited this page Aug 20, 2025 · 5 revisions

This document provides a comprehensive technical overview of the core classes in the PyTrickle SDK essential for developing AI-powered video, audio, and data streaming applications for the Livepeer network.

Architecture Overview

PyTrickle follows a modular architecture built around these key concepts:

  • Frame-based Processing: Video and audio data flow through the system as discrete frames
  • Async Processing: All processing is asynchronous for maximum performance
  • Protocol Abstraction: The trickle protocol handles streaming mechanics transparently
  • HTTP API Integration: Built-in REST API for remote control and parameter updates
  • Real-time Data Publishing: Support for publishing structured data alongside media streams

Core SDK Classes

1. StreamProcessor - High-Level Application Interface

The StreamProcessor class provides the simplest way to build streaming applications with custom processing logic. It wraps lower-level components and provides a clean function-based API.

Key Features:

  • Function-based processing (no class inheritance required)
  • Built-in HTTP server with REST API
  • Real-time parameter updates
  • Text/data publishing capabilities
  • Automatic model loading and lifecycle management

Basic Structure:

from pytrickle import StreamProcessor
from pytrickle.frames import VideoFrame, AudioFrame

# Define your processing functions
async def process_video(frame: VideoFrame) -> VideoFrame:
    # Your AI model processing here
    return processed_frame

async def process_audio(frame: AudioFrame) -> List[AudioFrame]:
    # Your audio processing here
    return [processed_frame]

def load_model(**kwargs):
    # Initialize your models here
    pass

def update_params(params: dict):
    # Handle parameter updates here
    pass

# Create and run processor
processor = StreamProcessor(
    video_processor=process_video,
    audio_processor=process_audio,
    model_loader=load_model,
    param_updater=update_params,
    name="my-ai-processor",
    port=8000
)
processor.run()  # Starts HTTP server and processing

2. FrameProcessor - Advanced Processing Interface

For more complex applications requiring fine-grained control, inherit from FrameProcessor to implement custom processing logic with full access to the streaming pipeline.

Key Features:

  • Abstract base class for custom processors
  • Lifecycle management (initialization, processing, cleanup)
  • Error handling and state management
  • Direct integration with StreamServer and TrickleClient

Basic Structure:

from pytrickle import FrameProcessor, StreamServer
from pytrickle.frames import VideoFrame, AudioFrame

class MyAIProcessor(FrameProcessor):
    def __init__(self, model_path: str, **kwargs):
        self.model_path = model_path
        self.model = None
        super().__init__(**kwargs)
    
    def load_model(self, **kwargs):
        # Load your AI model
        self.model = load_my_model(self.model_path)
    
    async def process_video_async(self, frame: VideoFrame) -> Optional[VideoFrame]:
        # Process video frame with your model
        processed_tensor = await self.model.process(frame.tensor)
        return frame.replace_tensor(processed_tensor)
    
    async def process_audio_async(self, frame: AudioFrame) -> Optional[List[AudioFrame]]:
        # Process audio frame
        return [frame]  # Pass through or process
    
    def update_params(self, params: dict):
        # Handle real-time parameter updates
        if "threshold" in params:
            self.model.threshold = params["threshold"]

# Usage
processor = MyAIProcessor(model_path="path/to/model")
server = StreamServer(frame_processor=processor, port=8000)
await server.run_forever()

3. VideoFrame and AudioFrame - Media Data Containers

These classes encapsulate media data with timing information and provide utilities for format conversion and processing.

VideoFrame

Represents video data as PyTorch tensors with timing metadata.

Key Properties:

  • tensor: PyTorch tensor containing image data (typically HWC or CHW format)
  • timestamp: Frame timestamp for synchronization
  • time_base: Timing base for timestamp interpretation
  • log_timestamps: Performance monitoring timestamps

Key Methods:

# Create from tensor
frame = VideoFrame.from_tensor(my_tensor, timestamp=0)

# Replace tensor (common for processing)
processed_frame = frame.replace_tensor(new_tensor)

# Convert to/from different formats
av_frame = frame.to_av_frame(tensor)

AudioFrame

Represents audio data with sample information and timing.

Key Properties:

  • samples: NumPy array containing audio samples
  • format: Audio format (e.g., 's16', 'fltp')
  • layout: Channel layout (e.g., 'mono', 'stereo')
  • rate: Sample rate in Hz
  • nb_samples: Number of samples per channel

4. TrickleProtocol - Stream Coordination

Manages the underlying trickle protocol for media streaming, handling subscription, publishing, and control channels.

Key Features:

  • Automatic connection management and reconnection
  • Ingress/egress stream coordination
  • Control message handling
  • Data publishing for structured output
  • FPS monitoring and performance tracking

5. StreamServer - HTTP API Server

Provides a complete HTTP server with REST API for stream management and control.

Key Endpoints:

  • POST /api/stream/start - Start processing stream
  • POST /api/stream/params - Update processing parameters
  • GET /api/stream/status - Get current status
  • POST /api/stream/stop - Stop processing

📚 For complete API documentation, see StreamServer API Reference

6. Data Publishing Support

Based on PR #18, PyTrickle supports publishing structured data alongside media streams for analytics, annotations, or control data.

Key Features:

  • JSON data publishing through data channel
  • Batched data transmission for efficiency
  • Integration with StreamProcessor for easy access

Complete Examples

Video Processing Example: Green Tint Filter

This example demonstrates real-time video processing with OpenCV, parameter updates, and performance monitoring. examples/process_video_example.py

Audio Processing Example: Echo Effect

This example demonstrates audio processing with channel handling and format conversion. examples/process_audio_example.py

Text Data Publishing Example: Frame Analysis

examples/text_output_example.py

This example demonstrates publishing structured data every N frames, based on the data publishing functionality from PR #18.

API Usage Examples

Starting a Stream

curl -X POST http://localhost:8000/api/stream/start \
  -H "Content-Type: application/json" \
  -d '{
    "subscribe_url": "http://localhost:3389/input",
    "publish_url": "http://localhost:3389/output",
    "gateway_request_id": "my_stream",
    "params": {
      "intensity": 0.8,
      "echo_delay": 0.3,
      "analysis_interval": 60
    }
  }'

Updating Parameters

curl -X POST http://localhost:8000/api/stream/params \
  -H "Content-Type: application/json" \
  -d '{
    "intensity": 0.9,
    "echo_strength": 0.6,
    "brightness_threshold": 0.4
  }'

Checking Status

curl http://localhost:8000/api/stream/status

Best Practices

  1. Performance: Use async processing for CPU-intensive operations
  2. Memory Management: Process frames in-place when possible using replace_tensor()
  3. Error Handling: Implement robust error handling in processing functions
  4. Parameter Validation: Validate and clamp parameter values in update_params()
  5. Logging: Use structured logging for debugging and monitoring
  6. Resource Cleanup: Properly manage model resources and memory
  7. Data Publishing: Batch data publishing for better performance

Integration Patterns

With AI Models

# PyTorch model integration
class AIVideoProcessor(FrameProcessor):
    def load_model(self, model_path: str):
        self.model = torch.jit.load(model_path)
        self.model.eval()
    
    async def process_video_async(self, frame: VideoFrame):
        with torch.no_grad():
            output = await asyncio.get_event_loop().run_in_executor(
                None, self.model, frame.tensor
            )
        return frame.replace_tensor(output)

With External APIs

import aiohttp

async def process_with_api(frame: VideoFrame) -> VideoFrame:
    async with aiohttp.ClientSession() as session:
        # Convert frame to bytes and send to API
        frame_data = frame_to_bytes(frame)
        async with session.post('http://api.example.com/process', data=frame_data) as resp:
            result_data = await resp.read()
            processed_tensor = bytes_to_tensor(result_data)
            return frame.replace_tensor(processed_tensor)