Threads Module
This file defines the Threads
class, which is responsible for managing, spawning, and communicating with all threads in the @sodacore/core
library. It handles thread lifecycle (init, start, stop), IPC (inter-process communication), controller dispatching, and queue management for asynchronous thread operations.
Features
- Thread Discovery: Finds all thread controllers and thread modules registered in the global
Registry
. - Thread Spawning: Spawns a subprocess for each thread using Bun's
spawn
, passing the appropriate flags and bootstrap wrapper. - IPC Handling: Manages message passing between the main process and threads, including lifecycle commands and controller method dispatch.
- Controller Dispatch: Routes incoming IPC messages to the correct controller and method, providing a
ThreadContext
for execution. - Queue Management: Maintains a queue for pending IPC requests, resolving or timing out promises as needed.
- Graceful Shutdown: Stops all threads and cleans up resources on shutdown.
Usage
typescript
import Threads from './module/threads';
const threads = new Threads(config);
await threads.init();
await threads.start();
// ... later
await threads.stop();
API
Properties
- threads:
Map<string, Subprocess>
Map of thread names to their subprocess instances. - queue:
Map<string, { resolve: any, timeout?: number, createdAt?: number }>
Map of pending IPC requests, keyed by UID. - controllers:
Record<string, IThreadController>
Registered thread controllers, keyed by namespace. - timer:
any
Interval timer for cleaning up timed-out queue items.
Methods
async init(): Promise<void>
- Discovers all thread controllers and threads from the
Registry
. - Spawns subprocesses for each thread, passing the correct filename and flags.
- Sends the
@init
command to each thread and waits for acknowledgment. - Starts a timer to clean up timed-out queue items.
async start(): Promise<void>
- Sends the
@start
command to all threads and waits for acknowledgment.
async stop(): Promise<void>
- Stops the timer, kills all thread subprocesses, and logs the shutdown.
addToQueue(uid: string, resolve: (value: unknown) => void, timeout = 5000): void
- Adds a promise resolver to the queue for a given UID, with an optional timeout.
getThread(name: string): Subprocess | null
- Retrieves a thread subprocess by its class name.
Private Methods
private createThread(thread: any): void
- Spawns a new thread subprocess using the thread's filename and flags.
- Registers the subprocess in the
threads
map.
private async handleIpc(message: IThreadMessage, subprocess: Subprocess): Promise<void>
- Handles incoming IPC messages from threads:
- Resolves lifecycle commands (
@init
,@start
,@stop
) in the queue. - Handles log messages.
- Dispatches controller method calls using the correct namespace and method.
- Resolves or rejects queued promises based on message UID.
- Resolves lifecycle commands (
private handleQueueTimeout(): void
- Periodically checks the queue for timed-out items and resolves them with
null
.
How It Works
Initialization:
- Discovers controllers and threads, spawns subprocesses, and sends initialization commands.
IPC Communication:
- Handles messages from threads, dispatches controller methods, and manages responses.
Queue Management:
- Tracks pending requests and cleans up timed-out promises.
Lifecycle:
- Provides methods to start and stop all threads cleanly.
Notes
- Threads must be registered with the appropriate metadata and decorators to be discovered and managed.
- Controllers must be registered with a namespace and expose methods to handle thread commands.
- The module uses Bun's
spawn
and IPC features for process management and communication. - Proper error handling and logging are included for debugging and traceability.