Queue¶
Katal's queue system provides background job processing with retry/backoff, scheduling, and a swappable store interface. The default store is fully in-memory — no external dependencies needed for development or single-instance production.
Architecture¶
| Component | Purpose |
|---|---|
QueueStore |
Interface — store/retrieve jobs |
InMemoryQueue |
Built-in in-memory QueueStore |
QueueWorker |
Polls the queue and dispatches registered handlers |
QueueScheduler |
Enqueue recurring or one-shot delayed jobs |
QueueStoreAdapter |
Wrap third-party queues to satisfy QueueStore |
Enqueue Jobs¶
import { Application } from "katal";
const app = new Application({ port: 3000 });
// app uses an in-memory queue by default (lazy-initialised)
const queue = app.getQueue();
await queue.enqueue("send-email", { userId: "u-1", to: "alice@example.com" });
With Retry / Backoff¶
await queue.enqueue("send-email", { userId: "u-1" }, {
maxAttempts: 5,
retry: {
strategy: "exponential", // "none" | "fixed" | "exponential"
delayMs: 250,
maxDelayMs: 10_000,
jitter: true,
jitterFactor: 0.2, // ±20% randomness on each delay
},
});
Delayed Job¶
// Deliver this job 30 seconds from now
await queue.enqueue("send-email", { userId: "u-1" }, { delayMs: 30_000 });
Process Jobs¶
Register handlers and start polling:
import { QueueWorker } from "katal";
const worker = new QueueWorker(app.getQueue(), {
pollIntervalMs: 100, // how often to check the queue
concurrency: 2, // max concurrent jobs
});
worker.register<{ userId: string; to: string }>("send-email", async (job) => {
await mailer.send({ to: job.payload.to });
});
worker.register<{ scope: string }>("cleanup", async (job) => {
await cache.flush(job.payload.scope);
});
worker.start();
// Graceful shutdown
process.on("SIGTERM", async () => {
await worker.stop();
});
Scheduler¶
One-shot and recurring jobs:
import { QueueScheduler } from "katal";
const scheduler = new QueueScheduler(app.getQueue());
// Run once after a delay
await scheduler.scheduleOnce("digest", { scope: "daily" }, 60_000);
// Repeat every N milliseconds
const recurring = scheduler.every("cleanup", { scope: "cache" }, 15_000);
// Cron-style intervals (second-granularity)
const cronJob = scheduler.everyCron("report", { type: "weekly" }, "@every 15s");
// Also supports: "*/5 * * * * *" (every 5 seconds)
// Stop individual schedulers
recurring.stop();
cronJob.stop();
// Stop all at once
scheduler.stopAll();
Supported Cron Expressions¶
| Expression | Meaning |
|---|---|
@every 5s |
every 5 seconds |
@every 1m |
every minute |
@every 1h |
every hour |
*/5 * * * * * |
every 5 seconds (6-field syntax) |
Application Integration¶
Application manages the queue store via convenience methods:
// Get the active queue (initialises in-memory queue on first call)
const queue = app.getQueue();
// Check which driver is active
const driver = app.getQueueDriver(); // "memory" | "custom"
// Swap in a custom store (e.g. Redis-backed)
app.useQueue(myRedisQueueStore);
// Switch back to built-in in-memory queue
app.useInMemoryQueue();
Queue initialisation is lazy — the in-memory queue is only created when
app.getQueue()is first called.
Custom Queue Adapter¶
Wrap any third-party queue library with QueueStoreAdapter:
import { createQueueStoreAdapter } from "katal";
const store = createQueueStoreAdapter({
async enqueue(name, payload, options) { /* ... */ },
async dequeue() { /* ... */ },
async complete(id) { /* ... */ },
async fail(id, error) { /* ... */ },
async get(id) { /* ... */ },
async list(name) { /* ... */ },
});
app.useQueue(store);
Testing with Queues¶
Use FakeClock to control time without waiting. See Testing for full details:
import { AppFactory, FakeClock } from "katal";
const clock = new FakeClock();
const queue = new InMemoryQueue({ now: () => clock.now() });
// ... enqueue jobs, advance clock, then inspect queue state
See Also¶
Components¶
InMemoryQueue: stores and tracks jobs in memory.QueueWorker: polls queue and dispatches handlers.JobRecord/JobOptions: typed job metadata.
Enqueue Jobs¶
import { InMemoryQueue } from "katal";
const queue = new InMemoryQueue();
await queue.enqueue("send-email", { userId: "u-1" });
Retry and Backoff¶
await queue.enqueue("send-email", { userId: "u-1" }, {
maxAttempts: 5,
retry: {
strategy: "exponential", // "none" | "fixed" | "exponential"
delayMs: 250,
maxDelayMs: 10_000,
jitter: true,
jitterFactor: 0.2,
},
});
Delayed Job¶
await queue.enqueue("send-email", { userId: "u-1" }, { delayMs: 30_000 });
Process Jobs¶
import { InMemoryQueue, QueueWorker } from "katal";
const queue = new InMemoryQueue();
const worker = new QueueWorker(queue, { pollIntervalMs: 25, concurrency: 2 });
worker.register<{ userId: string }>("send-email", async (job) => {
console.log("send email for", job.payload.userId);
});
worker.start();
Scheduler¶
import { InMemoryQueue, QueueScheduler } from "katal";
const queue = new InMemoryQueue();
const scheduler = new QueueScheduler(queue);
await scheduler.scheduleOnce("digest", { scope: "daily" }, 60_000);
const recurring = scheduler.every("cleanup", { scope: "cache" }, 15_000);
// cron-like syntax
const recurringCron = scheduler.everyCron(
"cleanup",
{ scope: "cache" },
"@every 15s",
);
// also supports second-step format: */5 * * * * *
// later
recurring.stop();
recurringCron.stop();
scheduler.stopAll();
Application Integration¶
Application now binds a queue store in the container by default.
Queue initialization is lazy: the in-memory queue is only created when queue features are actually used.
app.getQueue()returns the active queue store.app.useQueue(customStore)swaps in a custom implementation.app.useInMemoryQueue()switches back to built-in memory queue.app.getQueueDriver()returns"memory"or"custom".
This keeps Redis-backed queue support additive in the next PR.
Recommended Strategy¶
Use the built-in queue by default, then migrate to custom queue infrastructure as you scale.
- Start with
memoryfor local/dev and single-instance production. - Switch to
customwhen you need durability, multi-instance processing, or advanced broker features.
Common migration triggers:
- More than one app instance is running.
- Jobs must survive process restarts.
- You need dead-letter queues, delayed delivery, or broker-level observability.
When NODE_ENV=production and the memory queue is active, Katal logs a one-time warning to guide migration.
Production Queue Checklist¶
Use this checklist to pick queue strategy by workload profile.
Stage 1: Single-instance apps (early stage)¶
- Recommended driver:
memory - Typical profile:
- one app instance
- jobs are low value or easy to re-run
- brief downtime/restarts are acceptable
- Suggested setup:
app.useInMemoryQueue()- keep handlers idempotent
Stage 2: Growth apps (multi-instance or durability needs)¶
- Recommended driver:
custom - Typical profile:
- 2+ app instances
- job durability required across restarts/deploys
- retries, delayed jobs, and backpressure become important
- Suggested setup:
- bridge your queue library with
createQueueStoreAdapter(...) - call
app.useQueue(customStore) - monitor queue depth and oldest-job age
- bridge your queue library with
Stage 3: High-scale or regulated workloads¶
- Recommended driver:
customwith managed broker/infrastructure - Typical profile:
- strict SLOs, audit/compliance requirements
- dead-letter policies, poison message handling, replay workflows
- strong operational visibility and alerting required
- Suggested setup:
- use broker-native observability and retention controls
- separate worker processes from API instances
- define runbooks for retry storms and backlog growth
Quick decision rules¶
- If jobs must survive restarts: use
custom. - If multiple app instances process jobs: use
custom. - If workload is simple and non-critical:
memoryis fine.
Bring Your Own Queue Library¶
You can keep your existing queue stack (BullMQ, Bee-Queue, custom broker wrapper, etc.) and adapt it via QueueStore.
Option A: Implement QueueStore directly¶
import type { QueueStore } from "katal";
class MyQueueStore implements QueueStore {
async enqueue(name, payload, options) {
// bridge to your own queue producer
return {
id: "external-id",
name,
payload,
status: "pending",
attempts: 0,
maxAttempts: options?.maxAttempts ?? 1,
createdAt: Date.now(),
updatedAt: Date.now(),
};
}
async dequeue() {
return null;
}
async complete() {}
async fail() {}
async get() {
return null;
}
async list() {
return [];
}
}
Option B: Use createQueueStoreAdapter(...)¶
import { createQueueStoreAdapter } from "katal";
const queue = createQueueStoreAdapter({
enqueue: async (name, payload) => {
// send to external queue and return a job record shape
return {
id: crypto.randomUUID(),
name,
payload,
status: "pending",
attempts: 0,
maxAttempts: 1,
createdAt: Date.now(),
updatedAt: Date.now(),
};
},
list: async () => [],
});
Then wire it into Katal:
app.useQueue(queue);
Note: if you use QueueWorker, implement dequeue, complete, and fail in your adapter.