Features
Docs
CLI
Benchmarks
Examples

© 2024 MoroJs

Message Queue

Production-ready message queue system with support for multiple adapters. Process background jobs with Bull, RabbitMQ, AWS SQS, Kafka, or in-memory queues.

Message Queues That Just Work

Process background jobs with any adapter.
Bull, RabbitMQ, AWS SQS, Kafka, or in-memory. Same API.

It's This Simple

Set up a queue and process jobs

typescript

1import { createApp } from '@morojs/moro';
2
3const app = createApp();
4
5// Configure queue
6app.queueInit('emails', {
7  adapter: 'bull',
8  connection: { host: 'localhost', port: 6379 },
9  concurrency: 5
10});
11
12// Process jobs
13await app.processQueue('emails', async (job) => {
14  await sendEmail(job.data);
15  return { sent: true };
16});
17
18// Add jobs to queue
19app.post('/send-email').handler(async (req, res) => {
20  await app.addToQueue('emails', {
21    to: req.body.email,
22    subject: 'Welcome',
23    body: 'Welcome to our platform!'
24  });
25  return { queued: true };
26});

Why Message Queues Matter

Without proper queue systems, you're manually managing job processing, handling failures, and dealing with different queue APIs. With MoroJS, you get all of that automatically.

Traditional queue setup requires different libraries and APIs for each adapter. We give you one consistent API.

Without Queue System

  • Different APIs for each queue system
  • Manual retry logic
  • No dead letter queue handling
  • Complex adapter switching

With MoroJS

  • Same API across all adapters
  • Automatic retry with backoff
  • Built-in dead letter queue
  • Easy adapter switching

It's This Easy

Use the same API regardless of which adapter you choose.

Same API, different adapters

typescript

1// Memory adapter (development)
2app.queueInit('tasks', { adapter: 'memory' });
3
4// Bull adapter (Redis)
5app.queueInit('tasks', { 
6  adapter: 'bull',
7  connection: { host: 'localhost', port: 6379 }
8});
9
10// RabbitMQ adapter
11app.queueInit('tasks', { 
12  adapter: 'rabbitmq',
13  connection: { host: 'localhost', port: 5672 }
14});
15
16// All use the same API
17await app.addToQueue('tasks', { data: 'value' });
18await app.processQueue('tasks', async (job) => { /* ... */ });

Why It Makes Sense

Flexible

Choose Bull, RabbitMQ, SQS, Kafka, or Memory. Same API.

Production-Ready

Retries, dead letter queues, priority queues, and metrics included.

Consistent

Same API across all adapters. Switch between them easily.

How It Works

MoroJS provides a unified queue API that works with multiple adapters. You configure a queue with your chosen adapter, add jobs to it, and process them with handlers. The system handles retries, dead letter queues, priority, and monitoring automatically.

Quick Start

Basic Queue Setup

typescript

1import { createApp } from '@morojs/moro';
2
3const app = createApp();
4
5// Configure queue - synchronous, no await needed!
6app.queueInit('emails', {
7  adapter: 'bull',
8  connection: {
9    host: 'localhost',
10    port: 6379
11  },
12  concurrency: 5,
13  defaultJobOptions: {
14    removeOnComplete: true,
15    removeOnFail: false,
16    attempts: 3,
17    backoff: {
18      type: 'exponential',
19      delay: 2000
20    }
21  }
22});
23
24// Process jobs
25await app.processQueue('emails', async (job) => {
26  await sendEmail(job.data);
27  return { sent: true };
28});
29
30// Add jobs to queue
31app.post('/send-email').handler(async (req, res) => {
32  await app.addToQueue('emails', {
33    to: req.body.email,
34    subject: 'Welcome',
35    body: 'Welcome to our platform!'
36  });
37
38  return { queued: true };
39});
40
41app.listen(3000);

Queue Adapters

Memory Adapter (Development)

typescript

1// In-memory queue for development and testing
2app.queueInit('tasks', {
3  adapter: 'memory',
4  concurrency: 5
5});
6
7// No external dependencies required
8// Data lost on restart

Bull Adapter (Redis)

typescript

1// Redis-based queue with persistence
2app.queueInit('tasks', {
3  adapter: 'bull',
4  connection: {
5    host: 'localhost',
6    port: 6379,
7    password: 'redis-password'
8  },
9  concurrency: 10
10});
11
12// Features:
13// - Persistent storage
14// - Job scheduling and delays
15// - Priority queues
16// - Rate limiting
17// - Retry with exponential backoff

RabbitMQ Adapter

typescript

1// AMQP-based message broker
2app.queueInit('tasks', {
3  adapter: 'rabbitmq',
4  connection: {
5    host: 'localhost',
6    port: 5672,
7    username: 'guest',
8    password: 'guest'
9  },
10  concurrency: 20
11});
12
13// Features:
14// - Message persistence
15// - Routing and exchanges
16// - Acknowledgments
17// - Dead letter exchanges

AWS SQS Adapter

typescript

1// Amazon Simple Queue Service
2app.queueInit('tasks', {
3  adapter: 'sqs',
4  connection: {
5    region: 'us-east-1',
6    queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
7  },
8  concurrency: 10
9});
10
11// Features:
12// - Managed service (no infrastructure)
13// - Automatic scaling
14// - Message retention (up to 14 days)
15// - FIFO queues available

Job Management

Adding Jobs

typescript

1// Single job
2const jobId = await app.addToQueue('emails', {
3  to: 'user@example.com',
4  template: 'welcome'
5});
6
7// Bulk jobs
8const jobIds = await app.addBulkToQueue('notifications', [
9  { data: { userId: 1, message: 'Update available' }, options: { priority: 10 } },
10  { data: { userId: 2, message: 'New features' }, options: { priority: 5 } },
11  { data: { userId: 3, message: 'Maintenance scheduled' }, options: { delay: 60000 } }
12]);
13
14// Job with options
15await app.addToQueue('reports',
16  { reportId: 123, format: 'pdf' },
17  {
18    priority: 10,           // Higher priority
19    delay: 60000,           // Delay 1 minute
20    attempts: 3,            // Retry 3 times
21    backoff: {
22      type: 'exponential',
23      delay: 5000
24    },
25    timeout: 30000          // 30 second timeout
26  }
27);

Job Processing with Progress

typescript

1await app.processQueue('video-encoding', async (job) => {
2  const { videoId, format } = job.data;
3
4  // Report progress
5  await job.updateProgress(0);
6
7  const video = await loadVideo(videoId);
8  await job.updateProgress(25);
9
10  const encoded = await encodeVideo(video, format);
11  await job.updateProgress(75);
12
13  await saveVideo(encoded);
14  await job.updateProgress(100);
15
16  return { videoId, format, size: encoded.size };
17});

Monitoring & Metrics

Queue Statistics

typescript

1// Get queue metrics
2const stats = await app.getQueueStats('emails');
3console.log(`Waiting: ${stats.waiting}`);
4console.log(`Active: ${stats.active}`);
5console.log(`Completed: ${stats.completed}`);
6console.log(`Failed: ${stats.failed}`);
7console.log(`Delayed: ${stats.delayed}`);
8
9// Expose metrics endpoint
10app.get('/metrics/queues', async (req, res) => {
11  const stats = await app.getQueueStats('emails');
12  return stats;
13});

Event Monitoring

typescript

1// Listen to queue events
2app.on('queue:job:completed', (event) => {
3  console.log(`Job ${event.jobId} completed`);
4});
5
6app.on('queue:job:failed', (event) => {
7  console.error(`Job ${event.jobId} failed:`, event.error);
8});
9
10app.on('queue:job:progress', (event) => {
11  console.log(`Job ${event.jobId} progress: ${event.progress}%`);
12});

Best Practices

1. Choose the Right Adapter

  • Development: Use memory adapter
  • Production (Simple): Use bull with Redis
  • Production (High Scale): Use kafka or rabbitmq
  • AWS Infrastructure: Use sqs

2. Set Appropriate Concurrency

1// CPU-bound tasks: lower concurrency
2app.queueInit('video-encoding', {
3  adapter: 'bull',
4  concurrency: 2  // Based on CPU cores
5});
6
7// I/O-bound tasks: higher concurrency
8app.queueInit('api-calls', {
9  adapter: 'bull',
10  concurrency: 50  // Many concurrent I/O operations
11});

3. Configure Retries Properly

1app.queueInit('payments', {
2  adapter: 'bull',
3  defaultJobOptions: {
4    attempts: 3,           // Limited retries for critical operations
5    backoff: {
6      type: 'exponential',
7      delay: 5000
8    }
9  }
10});

Next Steps