Skip to content

Workers Module

This file defines the Workers class, which is responsible for managing all worker instances in the @sodacore/core library. It handles worker discovery, spawning, pooling, queue management, message dispatching, error handling, and automatic worker restarts.


Features

  • Worker Discovery: Finds all registered worker modules using metadata and prepares controllers for each.
  • Worker Pooling: Supports multiple worker instances per controller, configurable via the poolSize option.
  • Queue Management: Maintains a queue for each worker controller, dispatching tasks to available workers and handling timeouts.
  • IPC Handling: Manages message passing between the main process and workers, including initialization, method calls, and error handling.
  • Automatic Restart: Automatically restarts workers if they exit unexpectedly, unless the application is shutting down.
  • Monitoring: Periodically checks for backed-up queues and logs warnings if tasks are waiting too long.

Usage

typescript
import Workers from './module/workers';

const workers = new Workers(config);
await workers.init();
await workers.start();
// ... dispatch tasks to workers
await workers.stop();

API

Properties

  • controllers: Record<string, IWorkerControllerItem> Map of worker controllers, keyed by UID.
  • checkTimer: Timer | null Interval timer for checking backed-up queues.

Methods

async init(): Promise<void>

Discovers all worker modules, creates controller definitions, and starts a periodic check for backed-up queues.

async start(): Promise<void>

Starts all workers for each controller, creating the number of worker instances specified by poolSize.

async stop(): Promise<void>

Stops all workers and clears the periodic queue check timer.

getControllers(): Record<string, IWorkerControllerItem>

Returns all registered worker controllers.

getController(uid: string): IWorkerControllerItem | undefined

Returns a specific worker controller by its UID.

dispatch<T = any>(uid: string, method: string, params: unknown[] = [], timeout?: number): Promise<T>

Queues a method call for a worker, returning a promise that resolves or rejects with the result or error. Handles timeouts and queue management.


Private Methods

private createWorker(uid: string, name: string, path: string): Worker

Creates a new worker instance, sets up event listeners for open, error, close, and message events, and handles worker lifecycle and queue processing.

private processQueue(controller: IWorkerControllerItem): void

Finds an available worker in the pool, assigns the next queued task, and sends the message to the worker.

private checkQueue(): void

Periodically checks for tasks that have been queued for more than one minute and logs a warning if any are found.


How It Works

  1. Initialization:

    • Discovers all worker modules and creates controllers for each.
    • Sets up a periodic check for backed-up queues.
  2. Starting Workers:

    • For each controller, spawns the number of worker instances specified by poolSize.
    • Each worker is managed and tracked in the controller's workers array.
  3. Dispatching Tasks:

    • Tasks are queued per controller and dispatched to available workers.
    • Each worker processes one task at a time; results or errors are returned via IPC.
  4. Worker Lifecycle:

    • Handles worker events:
      • open: Initializes the worker.
      • error: Logs errors.
      • close: Removes the worker from the pool and restarts it if not shutting down.
      • message: Handles responses, resolves/rejects promises, and manages the queue.
  5. Monitoring:

    • Periodically checks for tasks that have been waiting too long and logs warnings.

Notes

  • Workers are restarted automatically if they exit unexpectedly, unless the application is shutting down.
  • Each worker controller manages its own pool of workers and task queue.
  • The dispatch method is the primary way to send tasks to workers and receive results asynchronously.
  • Proper error handling and logging are included for debugging and operational visibility.

Released under the Apache-2.0 License.