Skip to content

Threads Module

This file defines the Threads class, which is responsible for managing, spawning, and communicating with all threads in the @sodacore/core library. It handles thread lifecycle (init, start, stop), IPC (inter-process communication), controller dispatching, and queue management for asynchronous thread operations.


Features

  • Thread Discovery: Finds all thread controllers and thread modules registered in the global Registry.
  • Thread Spawning: Spawns a subprocess for each thread using Bun's spawn, passing the appropriate flags and bootstrap wrapper.
  • IPC Handling: Manages message passing between the main process and threads, including lifecycle commands and controller method dispatch.
  • Controller Dispatch: Routes incoming IPC messages to the correct controller and method, providing a ThreadContext for execution.
  • Queue Management: Maintains a queue for pending IPC requests, resolving or timing out promises as needed.
  • Graceful Shutdown: Stops all threads and cleans up resources on shutdown.

Usage

typescript
import Threads from './module/threads';

const threads = new Threads(config);
await threads.init();
await threads.start();
// ... later
await threads.stop();

API

Properties

  • threads: Map<string, Subprocess> Map of thread names to their subprocess instances.
  • queue: Map<string, { resolve: any, timeout?: number, createdAt?: number }> Map of pending IPC requests, keyed by UID.
  • controllers: Record<string, IThreadController> Registered thread controllers, keyed by namespace.
  • timer: any Interval timer for cleaning up timed-out queue items.

Methods

async init(): Promise<void>

  • Discovers all thread controllers and threads from the Registry.
  • Spawns subprocesses for each thread, passing the correct filename and flags.
  • Sends the @init command to each thread and waits for acknowledgment.
  • Starts a timer to clean up timed-out queue items.

async start(): Promise<void>

  • Sends the @start command to all threads and waits for acknowledgment.

async stop(): Promise<void>

  • Stops the timer, kills all thread subprocesses, and logs the shutdown.

addToQueue(uid: string, resolve: (value: unknown) => void, timeout = 5000): void

  • Adds a promise resolver to the queue for a given UID, with an optional timeout.

getThread(name: string): Subprocess | null

  • Retrieves a thread subprocess by its class name.

Private Methods

private createThread(thread: any): void

  • Spawns a new thread subprocess using the thread's filename and flags.
  • Registers the subprocess in the threads map.

private async handleIpc(message: IThreadMessage, subprocess: Subprocess): Promise<void>

  • Handles incoming IPC messages from threads:
    • Resolves lifecycle commands (@init, @start, @stop) in the queue.
    • Handles log messages.
    • Dispatches controller method calls using the correct namespace and method.
    • Resolves or rejects queued promises based on message UID.

private handleQueueTimeout(): void

  • Periodically checks the queue for timed-out items and resolves them with null.

How It Works

  1. Initialization:

    • Discovers controllers and threads, spawns subprocesses, and sends initialization commands.
  2. IPC Communication:

    • Handles messages from threads, dispatches controller methods, and manages responses.
  3. Queue Management:

    • Tracks pending requests and cleans up timed-out promises.
  4. Lifecycle:

    • Provides methods to start and stop all threads cleanly.

Notes

  • Threads must be registered with the appropriate metadata and decorators to be discovered and managed.
  • Controllers must be registered with a namespace and expose methods to handle thread commands.
  • The module uses Bun's spawn and IPC features for process management and communication.
  • Proper error handling and logging are included for debugging and traceability.

Released under the Apache-2.0 License.