This Node.js application demonstrates the dynamic orchestration of worker threads with alternating roles (Pushers and Pullers) and state persistence using Redis. The Pushers add messages to a queue, and the Pullers process them. After processing 10 messages, roles are switched dynamically. The system gracefully shuts down while persisting the state to Redis, allowing restoration upon restart.
- Dynamic Role Reversal: Pushers and Pullers alternate roles after every 10 messages.
- Scalability: Incremental spawning of Pushers and Pullers with limits (10 maximum for each role).
- State Persistence: Save and restore state using Redis.
- Graceful Shutdown: Proper cleanup of workers and state saving on exit.
- Express API: Start/stop processes, fetch system state, and stream state updates via SSE.
- Front-End Interface: Provides real-time updates on worker statuses, message counts, and system state.
- Node.js (v16 or above).
- Docker to run Redis Stack.
To manage state persistence, the application uses Docker Redis Stack. Follow these steps to set it up:
-
Pull the Redis Stack Docker Image:
docker pull redis/redis-stack:latest
-
Run Redis Stack:
docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
-
Verify Redis is Running:
- Access Redis Insight (a GUI tool) at: http://localhost:8001
- Ensure Redis server is running on port
6379.
-
Clone the Repository:
git clone <repository_url> cd <repository_folder>
-
Install Dependencies:
npm install
-
Start the Application:
node server.js
OR
npm run saad
OR
npm run start
OR
npm run dev
- Open the Front-End:
- Open your browser and navigate to:
http://localhost:3000 - The front-end will be available to interact with the back-end system.
- Open your browser and navigate to:
- Access the API:
- Start process:
POST http://localhost:3000/start - Stop process:
POST http://localhost:3000/stop - Get system state:
GET http://localhost:3000/state - Get Redis state:
GET http://localhost:3000/getredisstate - Stream state updates via SSE:
GET http://localhost:3000/events
- Start process:
├── server.js # Express server with API routes
├── master.js # Manages workers, role switches, and state persistence
├── state.js # Redis state handling
├── queue.js # BullMQ queue management
├── workers
│ ├── pusherWorker.js # Pushes messages to the queue
│ └── pullerWorker.js # Processes messages from the queue
├── public
│ ├── index.html # Front-end HTML for system interface
│ ├── style.css # Front-end styling for the user interface
│ └── script.js # JavaScript to handle real-time updates and interactions
The front-end provides a user interface for interacting with the worker threads system. The user can start and stop the process, get real-time updates on worker states, and see the Redis state.
-
Real-Time System Status:
- Displays the current state of the system, including Pusher and Puller counts, message counts, and role switch counts.
- Dynamically updates in real-time using Server-Sent Events (SSE).
-
Interactive Controls:
- Provides buttons to start and stop the worker process.
- Option to fetch and display the current Redis state with the "Get Redis State" button.
-
Real-Time Updates:
- Visualizes the real-time progress of the worker threads with progress bars and message counts.
- Displays which role (Pusher or Puller) is currently active, including how many messages have been processed in each increment.
-
Elegant Display of Data:
- Presents information in structured and visually appealing cards for easy tracking of system metrics (like message count, worker counts, and state changes).
-
Responsive and User-Friendly Interface:
- Ensures smooth interaction with the back-end using AJAX calls, providing updates without needing to reload the page.
Handles the Express API endpoints:
- Start Process: Starts the worker management process.
- Stop Process: Stops all workers and saves the state.
- State Endpoints: Provide current system and Redis state.
- SSE Endpoint: Streams system state updates in real-time.
Core logic for managing workers, switching roles, and handling state.
-
startProcess:
- Restores the last known state from Redis.
- Spawns initial Pusher and Puller workers if no state exists.
-
stopProcess:
- Terminates all active workers.
- Saves the current state to Redis.
-
spawnPusher/spawnPuller:
- Creates new Pusher or Puller workers while ensuring limits.
-
switchRoles:
- Alternates between Pusher and Puller roles every 10 messages.
- Dynamically adjusts the number of workers based on the current phase.
-
loadPreviousState:
- Retrieves and restores state from Redis.
-
getSystemState:
- Returns a snapshot of the current system state.
Manages Redis connections and state persistence.
-
saveState:
- Saves the system state to Redis.
-
loadState:
- Retrieves the last saved state from Redis.
-
redisConfig:
- Contains the configuration for Redis connection.
Handles BullMQ queue setup and job management.
-
messageQueue:
- Configures the BullMQ queue.
-
addJob:
- Adds a new job to the queue with provided data.
Pushes messages to the BullMQ queue.
- Interval: Sends a message every second.
- Communicates with the master thread upon successful push.
Processes messages from the BullMQ queue.
- Limiter: Restricts processing rate to 10 jobs per second.
- Removes jobs after processing.
Endpoint: POST /start
- Starts the worker management process.
- Initializes workers and restores state if available.
Endpoint: POST /stop
- Stops all workers.
- Persists the current state to Redis.
Endpoint: GET /state
- Returns the current state of workers and role switches.
Endpoint: GET /getredisstate
- Returns the last saved state from Redis.
Endpoint: GET /events
- Streams real-time state updates using Server-Sent Events (SSE).
You can configure Redis connection settings in state.js:
const redisConfig = { host: 'localhost', port: 6379 };The application listens for SIGINT and ensures:
- All workers are terminated.
- The current state is saved to Redis.
- Adding support for configurable limits (e.g., MAX_PUSHERS, MAX_PULLERS).
- Enhancing state restoration to handle partial failures.
- Implementing metrics for monitoring worker performance.