Queue

Katal's queue system provides background job processing with retry/backoff, scheduling, and a swappable store interface. The default store is fully in-memory — no external dependencies needed for development or single-instance production.


Architecture

Component Purpose
QueueStore Interface — store/retrieve jobs
InMemoryQueue Built-in in-memory QueueStore
QueueWorker Polls the queue and dispatches registered handlers
QueueScheduler Enqueue recurring or one-shot delayed jobs
QueueStoreAdapter Wrap third-party queues to satisfy QueueStore

Enqueue Jobs

import { Application } from "katal";

const app = new Application({ port: 3000 });

// app uses an in-memory queue by default (lazy-initialised)
const queue = app.getQueue();

await queue.enqueue("send-email", { userId: "u-1", to: "alice@example.com" });

With Retry / Backoff

await queue.enqueue("send-email", { userId: "u-1" }, {
  maxAttempts: 5,
  retry: {
    strategy:    "exponential", // "none" | "fixed" | "exponential"
    delayMs:     250,
    maxDelayMs:  10_000,
    jitter:      true,
    jitterFactor: 0.2,          // ±20% randomness on each delay
  },
});

Delayed Job

// Deliver this job 30 seconds from now
await queue.enqueue("send-email", { userId: "u-1" }, { delayMs: 30_000 });

Process Jobs

Register handlers and start polling:

import { QueueWorker } from "katal";

const worker = new QueueWorker(app.getQueue(), {
  pollIntervalMs: 100,   // how often to check the queue
  concurrency:    2,     // max concurrent jobs
});

worker.register<{ userId: string; to: string }>("send-email", async (job) => {
  await mailer.send({ to: job.payload.to });
});

worker.register<{ scope: string }>("cleanup", async (job) => {
  await cache.flush(job.payload.scope);
});

worker.start();

// Graceful shutdown
process.on("SIGTERM", async () => {
  await worker.stop();
});

Scheduler

One-shot and recurring jobs:

import { QueueScheduler } from "katal";

const scheduler = new QueueScheduler(app.getQueue());

// Run once after a delay
await scheduler.scheduleOnce("digest", { scope: "daily" }, 60_000);

// Repeat every N milliseconds
const recurring = scheduler.every("cleanup", { scope: "cache" }, 15_000);

// Cron-style intervals (second-granularity)
const cronJob = scheduler.everyCron("report", { type: "weekly" }, "@every 15s");
// Also supports: "*/5 * * * * *" (every 5 seconds)

// Stop individual schedulers
recurring.stop();
cronJob.stop();

// Stop all at once
scheduler.stopAll();

Supported Cron Expressions

Expression Meaning
@every 5s every 5 seconds
@every 1m every minute
@every 1h every hour
*/5 * * * * * every 5 seconds (6-field syntax)

Application Integration

Application manages the queue store via convenience methods:

// Get the active queue (initialises in-memory queue on first call)
const queue = app.getQueue();

// Check which driver is active
const driver = app.getQueueDriver(); // "memory" | "custom"

// Swap in a custom store (e.g. Redis-backed)
app.useQueue(myRedisQueueStore);

// Switch back to built-in in-memory queue
app.useInMemoryQueue();

Queue initialisation is lazy — the in-memory queue is only created when app.getQueue() is first called.


Custom Queue Adapter

Wrap any third-party queue library with QueueStoreAdapter:

import { createQueueStoreAdapter } from "katal";

const store = createQueueStoreAdapter({
  async enqueue(name, payload, options) { /* ... */ },
  async dequeue()                        { /* ... */ },
  async complete(id)                     { /* ... */ },
  async fail(id, error)                  { /* ... */ },
  async get(id)                          { /* ... */ },
  async list(name)                       { /* ... */ },
});

app.useQueue(store);

Testing with Queues

Use FakeClock to control time without waiting. See Testing for full details:

import { AppFactory, FakeClock } from "katal";

const clock = new FakeClock();
const queue  = new InMemoryQueue({ now: () => clock.now() });

// ... enqueue jobs, advance clock, then inspect queue state

See Also

Components

  • InMemoryQueue: stores and tracks jobs in memory.
  • QueueWorker: polls queue and dispatches handlers.
  • JobRecord/JobOptions: typed job metadata.

Enqueue Jobs

import { InMemoryQueue } from "katal";

const queue = new InMemoryQueue();
await queue.enqueue("send-email", { userId: "u-1" });

Retry and Backoff

await queue.enqueue("send-email", { userId: "u-1" }, {
    maxAttempts: 5,
    retry: {
        strategy: "exponential", // "none" | "fixed" | "exponential"
        delayMs: 250,
        maxDelayMs: 10_000,
        jitter: true,
        jitterFactor: 0.2,
    },
});

Delayed Job

await queue.enqueue("send-email", { userId: "u-1" }, { delayMs: 30_000 });

Process Jobs

import { InMemoryQueue, QueueWorker } from "katal";

const queue = new InMemoryQueue();
const worker = new QueueWorker(queue, { pollIntervalMs: 25, concurrency: 2 });

worker.register<{ userId: string }>("send-email", async (job) => {
    console.log("send email for", job.payload.userId);
});

worker.start();

Scheduler

import { InMemoryQueue, QueueScheduler } from "katal";

const queue = new InMemoryQueue();
const scheduler = new QueueScheduler(queue);

await scheduler.scheduleOnce("digest", { scope: "daily" }, 60_000);

const recurring = scheduler.every("cleanup", { scope: "cache" }, 15_000);

// cron-like syntax
const recurringCron = scheduler.everyCron(
    "cleanup",
    { scope: "cache" },
    "@every 15s",
);

// also supports second-step format: */5 * * * * *

// later
recurring.stop();
recurringCron.stop();
scheduler.stopAll();

Application Integration

Application now binds a queue store in the container by default.

Queue initialization is lazy: the in-memory queue is only created when queue features are actually used.

  • app.getQueue() returns the active queue store.
  • app.useQueue(customStore) swaps in a custom implementation.
  • app.useInMemoryQueue() switches back to built-in memory queue.
  • app.getQueueDriver() returns "memory" or "custom".

This keeps Redis-backed queue support additive in the next PR.

Use the built-in queue by default, then migrate to custom queue infrastructure as you scale.

  • Start with memory for local/dev and single-instance production.
  • Switch to custom when you need durability, multi-instance processing, or advanced broker features.

Common migration triggers:

  • More than one app instance is running.
  • Jobs must survive process restarts.
  • You need dead-letter queues, delayed delivery, or broker-level observability.

When NODE_ENV=production and the memory queue is active, Katal logs a one-time warning to guide migration.

Production Queue Checklist

Use this checklist to pick queue strategy by workload profile.

Stage 1: Single-instance apps (early stage)

  • Recommended driver: memory
  • Typical profile:
    • one app instance
    • jobs are low value or easy to re-run
    • brief downtime/restarts are acceptable
  • Suggested setup:
    • app.useInMemoryQueue()
    • keep handlers idempotent

Stage 2: Growth apps (multi-instance or durability needs)

  • Recommended driver: custom
  • Typical profile:
    • 2+ app instances
    • job durability required across restarts/deploys
    • retries, delayed jobs, and backpressure become important
  • Suggested setup:
    • bridge your queue library with createQueueStoreAdapter(...)
    • call app.useQueue(customStore)
    • monitor queue depth and oldest-job age

Stage 3: High-scale or regulated workloads

  • Recommended driver: custom with managed broker/infrastructure
  • Typical profile:
    • strict SLOs, audit/compliance requirements
    • dead-letter policies, poison message handling, replay workflows
    • strong operational visibility and alerting required
  • Suggested setup:
    • use broker-native observability and retention controls
    • separate worker processes from API instances
    • define runbooks for retry storms and backlog growth

Quick decision rules

  • If jobs must survive restarts: use custom.
  • If multiple app instances process jobs: use custom.
  • If workload is simple and non-critical: memory is fine.

Bring Your Own Queue Library

You can keep your existing queue stack (BullMQ, Bee-Queue, custom broker wrapper, etc.) and adapt it via QueueStore.

Option A: Implement QueueStore directly

import type { QueueStore } from "katal";

class MyQueueStore implements QueueStore {
    async enqueue(name, payload, options) {
        // bridge to your own queue producer
        return {
            id: "external-id",
            name,
            payload,
            status: "pending",
            attempts: 0,
            maxAttempts: options?.maxAttempts ?? 1,
            createdAt: Date.now(),
            updatedAt: Date.now(),
        };
    }

    async dequeue() {
        return null;
    }

    async complete() {}
    async fail() {}
    async get() {
        return null;
    }
    async list() {
        return [];
    }
}

Option B: Use createQueueStoreAdapter(...)

import { createQueueStoreAdapter } from "katal";

const queue = createQueueStoreAdapter({
    enqueue: async (name, payload) => {
        // send to external queue and return a job record shape
        return {
            id: crypto.randomUUID(),
            name,
            payload,
            status: "pending",
            attempts: 0,
            maxAttempts: 1,
            createdAt: Date.now(),
            updatedAt: Date.now(),
        };
    },
    list: async () => [],
});

Then wire it into Katal:

app.useQueue(queue);

Note: if you use QueueWorker, implement dequeue, complete, and fail in your adapter.