Workers Module
This file defines the Workers
class, which is responsible for managing all worker instances in the @sodacore/core
library. It handles worker discovery, spawning, pooling, queue management, message dispatching, error handling, and automatic worker restarts.
Features
- Worker Discovery: Finds all registered worker modules using metadata and prepares controllers for each.
- Worker Pooling: Supports multiple worker instances per controller, configurable via the
poolSize
option. - Queue Management: Maintains a queue for each worker controller, dispatching tasks to available workers and handling timeouts.
- IPC Handling: Manages message passing between the main process and workers, including initialization, method calls, and error handling.
- Automatic Restart: Automatically restarts workers if they exit unexpectedly, unless the application is shutting down.
- Monitoring: Periodically checks for backed-up queues and logs warnings if tasks are waiting too long.
Usage
import Workers from './module/workers';
const workers = new Workers(config);
await workers.init();
await workers.start();
// ... dispatch tasks to workers
await workers.stop();
API
Properties
- controllers:
Record<string, IWorkerControllerItem>
Map of worker controllers, keyed by UID. - checkTimer:
Timer | null
Interval timer for checking backed-up queues.
Methods
async init(): Promise<void>
Discovers all worker modules, creates controller definitions, and starts a periodic check for backed-up queues.
async start(): Promise<void>
Starts all workers for each controller, creating the number of worker instances specified by poolSize
.
async stop(): Promise<void>
Stops all workers and clears the periodic queue check timer.
getControllers(): Record<string, IWorkerControllerItem>
Returns all registered worker controllers.
getController(uid: string): IWorkerControllerItem | undefined
Returns a specific worker controller by its UID.
dispatch<T = any>(uid: string, method: string, params: unknown[] = [], timeout?: number): Promise<T>
Queues a method call for a worker, returning a promise that resolves or rejects with the result or error. Handles timeouts and queue management.
Private Methods
private createWorker(uid: string, name: string, path: string): Worker
Creates a new worker instance, sets up event listeners for open, error, close, and message events, and handles worker lifecycle and queue processing.
private processQueue(controller: IWorkerControllerItem): void
Finds an available worker in the pool, assigns the next queued task, and sends the message to the worker.
private checkQueue(): void
Periodically checks for tasks that have been queued for more than one minute and logs a warning if any are found.
How It Works
Initialization:
- Discovers all worker modules and creates controllers for each.
- Sets up a periodic check for backed-up queues.
Starting Workers:
- For each controller, spawns the number of worker instances specified by
poolSize
. - Each worker is managed and tracked in the controller's
workers
array.
- For each controller, spawns the number of worker instances specified by
Dispatching Tasks:
- Tasks are queued per controller and dispatched to available workers.
- Each worker processes one task at a time; results or errors are returned via IPC.
Worker Lifecycle:
- Handles worker events:
- open: Initializes the worker.
- error: Logs errors.
- close: Removes the worker from the pool and restarts it if not shutting down.
- message: Handles responses, resolves/rejects promises, and manages the queue.
- Handles worker events:
Monitoring:
- Periodically checks for tasks that have been waiting too long and logs warnings.
Notes
- Workers are restarted automatically if they exit unexpectedly, unless the application is shutting down.
- Each worker controller manages its own pool of workers and task queue.
- The
dispatch
method is the primary way to send tasks to workers and receive results asynchronously. - Proper error handling and logging are included for debugging and operational visibility.