Node.js Background Jobs: Complete Theory & Practice Guide
Build reliable async job systems with queues, workers, retries, and production controls.
Bull + RedisWorker ThreadsRetries
Table of Contents
1. Theory: Background Jobs Fundamentals
What Are Background Jobs?
Background jobs are tasks executed asynchronously outside the main request-response cycle. They enable applications to defer time-consuming operations, improve user experience, and handle failures gracefully.
| Concept | Description | Why It Matters |
|---|---|---|
| Queue | Data structure storing pending jobs | Decouples job creation from execution |
| Worker | Process that executes jobs | Enables parallel processing |
| Producer | Code that creates/adds jobs | Separates job definition from execution |
| Consumer | Code that processes jobs | Handles actual business logic |
| Broker | Middleware managing job distribution | Redis, RabbitMQ, or database |
| Dead Letter Queue | Failed jobs storage | Prevents infinite retry loops |
┌─────────────────────────────────────────────────────────────┐
│ Request-Response Flow │
├─────────────────────────────────────────────────────────────┤
│ Immediate (Sync) Deferred (Background) │
│ • Authentication • Email notifications │
│ • Data validation • Image/video processing │
│ • Simple CRUD • PDF generation │
│ • Cache lookups • Data export/reports │
│ • API routing • Webhook delivery │
│ • Database backups │
│ • Batch data processing │
│ • Machine learning inference │
└─────────────────────────────────────────────────────────────┘
┌──────────┐
│ Added │ (Producer creates job)
└────┬─────┘
▼
┌──────────┐
│ Waiting │ (In queue, not yet processed)
└────┬─────┘
▼
┌──────────┐ ┌──────────┐
│ Delayed │────▶│ Active │ (Worker picks up)
└──────────┘ └────┬─────┘
│
┌───────────┼───────────┐
▼ ▼ ▼
┌────────┐ ┌──────────┐ ┌────────┐
│Complete│ │ Failed │ │Retrying│
└────────┘ └────┬─────┘ └───┬────┘
│ │
▼ │
┌──────────┐ │
│Dead Letter│◀──────┘
│ Queue │ (After max retries)
└──────────┘
Job Processing Patterns
// 1) Fire-and-Forget
queue.add('send-email', emailData);
res.json({ message: 'Email will be sent' });
// 2) Delayed Execution
queue.add('reminder', data, { delay: 3600000 }); // 1 hour
// 3) Retry with Backoff
queue.add('api-call', data, {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 }
});
// 4) Priority Queue
queue.add('critical', data, { priority: 10 }); // High priority
queue.add('normal', data, { priority: 5 }); // Medium
queue.add('background', data, { priority: 1 }); // Low
| Feature | Bull (Redis) | RabbitMQ | Agenda (MongoDB) | Native (Memory) |
|---|---|---|---|---|
| Persistence | Yes | Yes | Yes | No |
| Distributed | Yes | Yes | Yes | No |
| Priority | Yes | Yes | No | Manual |
| Delayed jobs | Yes | Yes | Yes | Manual |
| Job retries | Built-in | Manual | Built-in | Manual |
| Visibility | Dashboard | Management UI | No | No |
| Use case | Node.js apps | Enterprise | Node.js + Mongo | Development |
2. Basic: Simple Job Patterns
// simple-queue.js
class MemoryQueue {
constructor() {
this.queue = [];
this.processing = false;
this.handlers = new Map();
}
register(type, handler) { this.handlers.set(type, handler); }
add(type, data, options = {}) {
const job = {
id: `${Date.now()}-${Math.random()}`,
type, data,
attempts: 0,
maxAttempts: options.attempts || 3,
priority: options.priority || 0
};
const index = this.queue.findIndex(j => j.priority < job.priority);
if (index === -1) this.queue.push(job);
else this.queue.splice(index, 0, job);
this.process();
return job.id;
}
async process() {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
while (this.queue.length > 0) {
const job = this.queue.shift();
const handler = this.handlers.get(job.type);
if (!handler) continue;
job.attempts++;
try {
await handler(job.data);
console.log(`✅ Job ${job.id} completed`);
} catch (error) {
if (job.attempts < job.maxAttempts) {
const delay = Math.pow(2, job.attempts) * 1000;
setTimeout(() => this.queue.unshift(job), delay);
} else {
console.error(`❌ Job ${job.id} failed permanently`);
}
}
}
this.processing = false;
}
}
// event-jobs.js
const EventEmitter = require('events');
class JobProcessor extends EventEmitter {
constructor(options = {}) {
super();
this.concurrency = options.concurrency || 3;
this.active = 0;
this.queue = [];
this.registerHandlers();
}
registerHandlers() {
// Built-in job types
this.on('job:email', async (job) => {
console.log(`📧 Sending to ${job.data.to}`);
await this.sleep(100);
return { messageId: `msg_${Date.now()}` };
});
this.on('job:image', async (job) => {
console.log(`🖼️ Processing ${job.data.filename}`);
await this.sleep(500);
return { output: `processed_${job.data.filename}` };
});
this.on('job:export', async (job) => {
console.log(`📊 Exporting ${job.data.query}`);
await this.sleep(1000);
return { rows: 1250 };
});
}
async add(type, data, options = {}) {
return new Promise((resolve, reject) => {
this.queue.push({ type, data, resolve, reject, options });
this.process();
});
}
async process() {
if (this.active >= this.concurrency || this.queue.length === 0) return;
this.active++;
const job = this.queue.shift();
const startTime = Date.now();
this.emit('job-start', { id: job.id, type: job.type });
try {
const result = await this.executeJob(job);
const duration = Date.now() - startTime;
this.emit('job-complete', { type: job.type, duration, result });
job.resolve(result);
} catch (error) {
this.emit('job-error', { type: job.type, error: error.message });
job.reject(error);
}
this.active--;
this.process();
}
async executeJob(job) {
// Check for custom handler
if (job.options.handler) {
return await job.options.handler(job.data);
}
// Use built-in handler
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Job timeout'));
}, job.options.timeout || 30000);
this.emit(`job:${job.type}`, job)
.then(result => {
clearTimeout(timeout);
resolve(result);
})
.catch(reject);
});
}
sleep(ms) { return new Promise(r => setTimeout(r, ms)); }
}
// Usage
const jobs = new JobProcessor({ concurrency: 5 });
jobs.on('job-start', ({ type }) => console.log(`Started ${type}`));
jobs.on('job-complete', ({ type, duration }) =>
console.log(`Completed ${type} in ${duration}ms`));
// Add multiple jobs
Promise.all([
jobs.add('email', { to: 'alice@example.com' }),
jobs.add('email', { to: 'bob@example.com' }),
jobs.add('image', { filename: 'photo.jpg' }),
jobs.add('export', { query: 'SELECT * FROM users' }),
jobs.add('email', { to: 'charlie@example.com' }, { priority: 'high' })
]).then(results => console.log('All jobs done', results.length));
// cron-jobs.js
class CronScheduler {
constructor() { this.jobs = new Map(); }
parseCron(expression) {
const [minute, hour, day, month, weekday] = expression.split(' ');
return () => {
const now = new Date();
return this.matchField(minute, now.getMinutes()) &&
this.matchField(hour, now.getHours()) &&
this.matchField(day, now.getDate()) &&
this.matchField(month, now.getMonth() + 1) &&
this.matchField(weekday, now.getDay());
};
}
matchField(pattern, value) {
if (pattern === '*') return true;
if (pattern.includes(',')) return pattern.split(',').map(Number).includes(value);
if (pattern.includes('/')) {
const [_, step] = pattern.split('/');
return value % parseInt(step, 10) === 0;
}
return parseInt(pattern, 10) === value;
}
schedule(cronExpr, task, name) {
const shouldRun = this.parseCron(cronExpr);
let lastRun = 0;
const interval = setInterval(() => {
const now = Date.now();
if (shouldRun() && now - lastRun > 60000) {
lastRun = now;
task().catch(console.error);
}
}, 60000);
this.jobs.set(name, { interval, cronExpr });
return name;
}
stop(name) {
const job = this.jobs.get(name);
if (job) clearInterval(job.interval);
}
}
3. Advanced: Bull Queue
npm install bull ioredis
// bull-queue.js
const Bull = require('bull');
const Redis = require('ioredis');
class JobQueue {
constructor(name, options = {}) {
this.queue = new Bull(name, {
redis: { host: options.redisHost || 'localhost', port: options.redisPort || 6379 },
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 100,
removeOnFail: 500
}
});
this.setupEvents();
}
setupEvents() {
this.queue.on('completed', (job, result) => console.log(`✅ Job ${job.id} completed:`, result));
this.queue.on('failed', (job, err) => console.error(`❌ Job ${job.id} failed:`, err.message));
this.queue.on('stalled', (job) => console.warn(`⚠️ Job ${job.id} stalled`));
}
async add(data, options = {}) {
return this.queue.add(data, { priority: options.priority || 0, delay: options.delay || 0, ...options });
}
process(handler, concurrency = 1) {
this.queue.process(concurrency, async (job) => {
job.progress(10);
const result = await handler(job.data);
job.progress(100);
return result;
});
async getStats() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
this.queue.getWaitingCount(),
this.queue.getActiveCount(),
this.queue.getCompletedCount(),
this.queue.getFailedCount(),
this.queue.getDelayedCount()
]);
return { waiting, active, completed, failed, delayed };
}
}
// Usage Example
const emailQueue = new JobQueue('emails');
// Process emails
emailQueue.process(async (data) => {
const { to, subject, body } = data;
console.log(`Sending email to ${to}`);
await new Promise(r => setTimeout(r, 500));
if (Math.random() < 0.1) throw new Error('SMTP failure');
return { messageId: `msg_${Date.now()}`, sent: true };
}, 5); // 5 concurrent workers
// Add jobs
await emailQueue.add({
to: 'user1@example.com',
subject: 'Welcome!',
body: 'Thanks for signing up'
}, { priority: 10 });
await emailQueue.add({
to: 'user2@example.com',
subject: 'Your order',
body: 'Order #123 confirmed'
}, { delay: 5000 }); // Send after 5 seconds
// Monitor queue
setInterval(async () => {
const stats = await emailQueue.getStats();
console.log('Queue stats:', stats);
}, 5000);
// priority-queue.js
class PriorityJobSystem {
constructor() {
this.highPriority = new Bull('high-priority', { redis: { port: 6379 } });
this.defaultPriority = new Bull('default', { redis: { port: 6379 } });
this.lowPriority = new Bull('low-priority', { redis: { port: 6379 } });
}
async add(type, data, priority = 0) {
let queue;
if (priority >= 8) queue = this.highPriority;
else if (priority >= 3) queue = this.defaultPriority;
else queue = this.lowPriority;
return queue.add({ type, data }, { priority });
processAll(handlers) {
const processQueue = async (queue, handler) => {
queue.process(async (job) => {
return await handler(job.data.type, job.data.data);
});
};
processQueue(this.highPriority, handlers);
processQueue(this.defaultPriority, handlers);
processQueue(this.lowPriority, handlers);
}
}
// Usage
const handlers = {
email: async (data) => { /* send email */ },
image: async (data) => { /* process image */ },
report: async (data) => { /* generate report */ }
};
const jobSystem = new PriorityJobSystem();
jobSystem.processAll(handlers);
// Add jobs with different priorities
await jobSystem.add('email', { to: 'urgent@example.com' }, 10); // Critical
await jobSystem.add('report', { type: 'daily' }, 5); // Normal
await jobSystem.add('image', { filename: 'background.jpg' }, 1); // Low
4. Advanced: Job Workers
// worker-pool.js
const { Worker, isMainThread, parentPort } = require('worker_threads');
class WorkerPool {
constructor(workerFile, poolSize = require('os').cpus().length) {
this.workers = [];
this.queue = [];
this.activeTasks = 0;
for (let i = 0; i < poolSize; i++) {
const worker = new Worker(workerFile);
worker.on('error', (err) => console.error('Worker error:', err));
this.workers.push(worker);
}
}
runTask(task) {
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.processQueue();
});
}
processQueue() {
if (this.activeTasks >= this.workers.length || this.queue.length === 0) return;
const { task, resolve, reject } = this.queue.shift();
const worker = this.workers[this.activeTasks];
this.activeTasks++;
worker.once('message', (result) => {
this.activeTasks--;
resolve(result);
this.processQueue();
});
worker.once('error', reject);
worker.postMessage(task);
}
handleResult(result) {
this.activeTasks--;
this.processQueue();
}
async shutdown() {
await Promise.all(this.workers.map(w => w.terminate()));
}
}
// Worker thread code (worker.js)
if (!isMainThread) {
parentPort.on('message', async (task) => {
let result;
switch (task.type) {
case 'cpu-intensive':
result = await heavyComputation(task.data);
break;
case 'image':
result = await processImage(task.data);
break;
case 'api':
result = await callApi(task.data);
break;
}
parentPort.postMessage(result);
});
}
// Usage
const pool = new WorkerPool('./worker.js', 4);
const tasks = [
{ type: 'cpu-intensive', data: { iterations: 1000000 } },
{ type: 'image', data: { filename: 'photo.jpg' } },
{ type: 'api', data: { url: 'https://api.example.com' } }
];
const results = await Promise.all(tasks.map(task => pool.runTask(task)));
// strategies.js
class JobStrategies {
static retryExponential(attempt, baseDelay = 1000) {
return baseDelay * Math.pow(2, attempt - 1);
}
static retryLinear(attempt, delay = 5000) {
return attempt * delay;
}
static retryFibonacci(attempt, baseDelay = 1000) {
if (attempt <= 1) return baseDelay;
let a = 0, b = 1;
for (let i = 2; i <= attempt; i++) [a, b] = [b, a + b];
return b * baseDelay;
}
// Concurrency strategies
static concurrencyFixed(activeJobs, maxConcurrent) {
return activeJobs < maxConcurrent;
}
static concurrencyDynamic(resourceUsage) {
// Scale based on CPU/memory
const cpuUsage = resourceUsage.cpu;
const memoryUsage = resourceUsage.memory;
if (cpuUsage < 0.5 && memoryUsage < 0.6) return 10;
if (cpuUsage < 0.7 && memoryUsage < 0.7) return 5;
return 2;
}
// Batching strategies
static batchSize(size, strategy = 'fixed') {
if (strategy === 'fixed') return 100;
if (strategy === 'dynamic') {
return Math.min(1000, Math.max(10, Math.floor(1000 / Math.log(size + 1))));
}
return 50;
}
static shouldBatch(messageRate, queueLength) {
return messageRate > 100 || queueLength > 1000;
}
}
// Circuit breaker for failing jobs
class CircuitBreaker {
constructor(failureThreshold = 5, timeout = 60000) {
this.failureThreshold = failureThreshold;
this.timeout = timeout;
this.failures = 0;
this.state = 'CLOSED';
this.lastFailureTime = null;
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = 'HALF_OPEN';
console.log('Circuit half-open, testing...');
} else {
throw new Error('Circuit breaker is OPEN');
}
}
try {
const result = await fn();
if (this.state === 'HALF_OPEN') {
this.state = 'CLOSED';
this.failures = 0;
console.log('Circuit closed, service recovered');
}
return result;
} catch (error) {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.failureThreshold) {
this.state = 'OPEN';
console.log('Circuit opened due to failures');
}
throw error;
}
}
}
// Usage
const breaker = new CircuitBreaker(3, 30000);
async function unreliableJob() {
return breaker.execute(async () => {
if (Math.random() < 0.7) throw new Error('Random failure');
return { success: true };
});
}
5. Best Practices
// production-config.js
const jobConfig = {
// Job Design
idempotency: {
description: 'Jobs should be idempotent - running twice produces same result',
implementation: 'Use jobId as unique key and check before processing',
example: `
const processed = await redis.get(\`processed:\${job.id}\`);
if (processed) return { skipped: true };
await process();
await redis.setex(\`processed:\${job.id}\`, 86400, 'true');
`
},
// Error Handling
errorHandling: {
maxRetries: 3,
backoffType: 'exponential',
deadLetterQueue: true,
alertOnFailure: true,
example: `
try {
await processJob();
} catch (error) {
if (job.attemptsMade < job.opts.attempts) {
throw error; // Will retry
} else {
await moveToDeadLetterQueue(job);
await sendAlert(error);
}
}
`
},
// Monitoring
monitoring: {
metrics: [
'job_duration_seconds',
'job_queue_size',
'job_failures_total',
'active_workers'
],
alertThresholds: {
queueSize: 1000,
failureRate: 0.05, // 5%
jobDuration: 30000 // 30 seconds
}
},
// Resource Management
resourceManagement: {
concurrency: 'Match with CPU cores',
memoryLimit: 'Set max memory per worker',
timeout: 'Always set job timeout',
example: `
const queue = new Bull('jobs', {
settings: {
lockDuration: 30000, // Job lock timeout
stalledInterval: 30000, // Check for stalled jobs
maxStalledCount: 1 // Max stall before failure
}
});
`
},
// Queue Configuration
queueConfig: {
separateQueues: [
'critical', // High priority, low volume
'default', // Normal business logic
'bulk', // Batch processing
'dead-letter' // Failed jobs
],
rateLimiting: {
max: 100,
duration: 60000 // 100 jobs per minute
}
}
};
// Complete production-ready job setup
class ProductionJobSystem {
constructor() {
this.setupQueues();
this.setupMonitoring();
this.setupHealthChecks();
}
setupQueues() {
this.critical = this.createQueue('critical', {
limiter: { max: 10, duration: 1000 }, // 10 per second
defaultJobOptions: { priority: 10, attempts: 5 }
});
this.default = this.createQueue('default', {
limiter: { max: 100, duration: 1000 }, // 100 per second
defaultJobOptions: { priority: 5, attempts: 3 }
});
this.bulk = this.createQueue('bulk', {
limiter: { max: 10, duration: 60000 }, // 10 per minute
defaultJobOptions: { priority: 1, attempts: 1, timeout: 300000 }
});
}
createQueue(name, options) {
const queue = new Bull(name, {
redis: process.env.REDIS_URL,
...options,
settings: {
lockDuration: 60000,
stalledInterval: 30000,
maxStalledCount: 2
}
});
// Event handlers
queue.on('failed', (job, err) => this.handleFailure(name, job, err));
queue.on('stalled', (job) => this.handleStall(name, job));
queue.on('completed', (job) => this.recordMetric('job.completed', 1));
return queue;
}
async handleFailure(queueName, job, error) {
console.error(`Job ${job.id} failed in ${queueName}:`, error);
// Record metric
this.recordMetric('job.failed', 1, { queue: queueName });
// Send alert if failure rate high
const failureRate = await this.getFailureRate(queueName);
if (failureRate > 0.1) {
await this.sendAlert(`High failure rate in ${queueName}: ${failureRate}`);
}
// Move to dead letter if max attempts reached
if (job.attemptsMade >= job.opts.attempts) {
await this.moveToDeadLetter(job, error);
}
}
async gracefulShutdown() {
console.log('Shutting down job system...');
// Stop accepting new jobs
await Promise.all([
this.critical.pause(),
this.default.pause(),
this.bulk.pause()
]);
// Wait for current jobs to finish
await this.waitForActiveJobs();
// Close connections
await Promise.all([
this.critical.close(),
this.default.close(),
this.bulk.close()
]);
console.log('Job system shutdown complete');
}
waitForActiveJobs() {
return new Promise(resolve => {
const checkInterval = setInterval(async () => {
const active = await this.getActiveJobCount();
if (active === 0) {
clearInterval(checkInterval);
resolve();
}
}, 1000);
});
}
}
// checklist.js
const bestPractices = {
// ✅ Idempotency
idempotency: `
Use unique job IDs and track processed jobs
await redis.set(\`processed:\${job.id}\`, 'done', 'NX')
`,
// ✅ Small Job Size
jobSize: `
Keep jobs small (< 1MB)
Store large data in external storage (S3, DB)
Pass references (URLs, IDs) not actual data
`,
// ✅ Timeouts
timeouts: `
Always set job timeout
{ timeout: 30000 } // 30 seconds
Monitor and kill long-running jobs
`,
// ✅ Monitoring
monitoring: `
Track: queue length, job duration, failure rate
Alert on anomalies
Dashboard for visibility
`,
// ✅ Graceful Shutdown
gracefulShutdown: `
Stop accepting new jobs
Wait for current jobs to finish
Close connections properly
`,
// ✅ Error Handling
errorHandling: `
Catch and log errors
Implement retries with backoff
Dead letter queue for failures
Send alerts for critical failures
`,
// ✅ Resource Limits
resourceLimits: `
Set concurrency limits
Monitor memory usage
Implement circuit breakers
Rate limit external calls
`,
// ✅ Testing
testing: `
Unit test job handlers
Integration test queues
Test retry logic
Test failure scenarios
`
};
// Health check endpoint
app.get('/health/jobs', async (req, res) => {
const stats = {
critical: await getQueueStats('critical'),
default: await getQueueStats('default'),
bulk: await getQueueStats('bulk'),
deadLetter: await getDeadLetterCount()
};
const isHealthy =
stats.critical.waiting < 100 &&
stats.default.waiting < 1000 &&
stats.deadLetter < 50;
res.status(isHealthy ? 200 : 500).json({
status: isHealthy ? 'healthy' : 'degraded',
stats
});
});
Summary
| Concept | Key Takeaway |
|---|---|
| When to use | Defer non-critical, time-consuming operations |
| Queue choice | Bull for Redis, Agenda for MongoDB, RabbitMQ for enterprise |
| Retry strategy | Exponential backoff with max attempts |
| Monitoring | Track queue length, job duration, failure rate |
| Idempotency | Essential for reliable job processing |
| Graceful shutdown | Always wait for active jobs to complete |
# Install Bull
npm install bull ioredis
# Run Redis
docker run -p 6379:6379 redis
# Monitor queues
npx bull-board
10 Interview Questions + 10 MCQs
1What is the main purpose of background jobs?easy
Answer: Move expensive and non-critical work out of request-response to improve UX and resilience.
2What is a dead letter queue?easy
Answer: A queue that stores permanently failed jobs after max retries.
3Why is idempotency critical in job processing?medium
Answer: Retries and duplicates can happen; idempotency avoids duplicate side effects.
4When would you use worker threads?medium
Answer: For CPU-intensive tasks to keep the event loop responsive.
5What does exponential backoff solve?medium
Answer: It reduces retry storms by spacing retries progressively.
6How does Bull achieve persistence?medium
Answer: It stores queue/job state in Redis.
7Why separate critical/default/bulk queues?hard
Answer: Isolation prevents low-priority load from starving critical jobs.
8What should a graceful shutdown do for jobs?hard
Answer: Pause intake, let active jobs finish or timeout, then close connections.
9Which metrics are essential for queue monitoring?easy
Answer: Queue depth, duration, failure rate, retries, active workers.
10How does a circuit breaker improve reliability?hard
Answer: It stops repeated calls to unstable dependencies and allows controlled recovery.
10 Background Jobs MCQs
1
Background jobs are best for:
Explanation: Background jobs defer long-running work from request threads.
2
Bull uses which backend?
Explanation: Bull is Redis-based.
3
Dead letter queue stores:
Explanation: DLQ captures jobs that exceeded retry limits.
4
Best retry strategy for unstable APIs:
Explanation: Backoff reduces pressure and retry storms.
5
Idempotent job means:
Explanation: Duplicate execution should not duplicate side effects.
6
Worker threads are ideal for:
Explanation: They offload CPU work from event loop.
7
Queue priority helps by:
Explanation: Priority lets critical tasks preempt lower priority tasks.
8
Graceful shutdown should:
Explanation: Prevents job loss and inconsistent state.
9
A circuit breaker protects against:
Explanation: It opens to stop repeated failing calls.
10
Most important operational metric:
Explanation: Backlog and failures indicate system health quickly.