Bull / BullMQ Integration
NestLens integrates with Bull and BullMQ job queues to track job lifecycle, monitor performance, and debug queue processing issues.
Overview
The Job Watcher tracks:
- Job additions to queues
- Job state transitions (waiting → active → completed/failed)
- Processing duration
- Retry attempts
- Job failures and errors
- Delayed jobs
Setup
1. Install Bull or BullMQ
# For Bull
npm install @nestjs/bull bull
# For BullMQ (recommended)
npm install @nestjs/bullmq bullmq
2. Enable Job Watcher
// app.module.ts
import { NestLensModule } from 'nestlens';
@Module({
imports: [
NestLensModule.forRoot({
watchers: {
job: true, // Enable job tracking
},
}),
],
})
export class AppModule {}
3. Register Your Queues
For Bull (Classic)
// app.module.ts or queue.module.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { JobWatcher } from 'nestlens';
@Injectable()
export class QueueRegistration implements OnModuleInit {
constructor(
@InjectQueue('email') private emailQueue: Queue,
@InjectQueue('notifications') private notificationQueue: Queue,
private jobWatcher: JobWatcher,
) {}
async onModuleInit() {
// Register Bull queues with NestLens
this.jobWatcher.setupQueue(this.emailQueue, 'email');
this.jobWatcher.setupQueue(this.notificationQueue, 'notifications');
}
}
For BullMQ
// app.module.ts or queue.module.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { JobWatcher } from 'nestlens';
@Injectable()
export class QueueRegistration implements OnModuleInit, OnModuleDestroy {
constructor(
@InjectQueue('email') private emailQueue: Queue,
@InjectQueue('notifications') private notificationQueue: Queue,
private jobWatcher: JobWatcher,
) {}
async onModuleInit() {
// Register BullMQ queues with NestLens (auto-creates QueueEvents)
await this.jobWatcher.setupBullMQQueue(this.emailQueue, 'email');
await this.jobWatcher.setupBullMQQueue(this.notificationQueue, 'notifications');
}
async onModuleDestroy() {
// Clean up QueueEvents connections
await this.jobWatcher.closeQueueEvents();
}
}
Advanced: Manual QueueEvents management
If you need more control over the QueueEvents lifecycle, use setupQueueWithEvents:
import { Queue, QueueEvents } from 'bullmq';
@Injectable()
export class QueueRegistration implements OnModuleInit, OnModuleDestroy {
private queueEvents: QueueEvents;
constructor(
@InjectQueue('email') private emailQueue: Queue,
private jobWatcher: JobWatcher,
) {}
async onModuleInit() {
// Create QueueEvents manually with custom options
const connection = (await this.emailQueue.client).options;
this.queueEvents = new QueueEvents('email', { connection });
// Register with NestLens
this.jobWatcher.setupQueueWithEvents(this.emailQueue, this.queueEvents, 'email');
}
async onModuleDestroy() {
await this.queueEvents?.close();
}
}
Alternative: Provider Token (Bull Classic Only)
Use the NESTLENS_BULL_QUEUES token to provide Bull (classic) queues:
import { NESTLENS_BULL_QUEUES } from 'nestlens';
import { getQueueToken } from '@nestjs/bull';
@Module({
providers: [
{
provide: NESTLENS_BULL_QUEUES,
useFactory: (
emailQueue: Queue,
notificationQueue: Queue,
) => [
{ queue: emailQueue, name: 'email' },
{ queue: notificationQueue, name: 'notifications' },
],
inject: [
getQueueToken('email'),
getQueueToken('notifications'),
],
},
],
})
export class AppModule {}
API Reference
JobWatcher Methods
| Method | Description |
|---|---|
setupQueue(queue, queueName?) | Register a Bull (classic) queue for tracking |
setupBullMQQueue(queue, queueName?) | Register a BullMQ queue (auto-creates QueueEvents) |
setupQueueWithEvents(queue, queueEvents, queueName?) | Register a BullMQ queue with manual QueueEvents |
closeQueueEvents() | Close all QueueEvents created by setupBullMQQueue |
Key Differences
| Feature | Bull Classic | BullMQ |
|---|---|---|
| Setup method | setupQueue() | setupBullMQQueue() |
| Event source | Queue instance | QueueEvents (auto-created) |
| Cleanup | None required | Call closeQueueEvents() in onModuleDestroy |
Tracked Events
NestLens monitors all Bull/BullMQ queue events:
1. Waiting
Job added to queue, waiting for processing
{
type: 'job',
payload: {
name: 'send-email',
queue: 'email',
data: { to: 'user@example.com', subject: '...' },
status: 'waiting',
attempts: 0
}
}
2. Active
Job picked up by worker and processing started
{
type: 'job',
payload: {
name: 'send-email',
queue: 'email',
data: { to: 'user@example.com' },
status: 'active',
attempts: 0
}
}
3. Completed
Job finished successfully
{
type: 'job',
payload: {
name: 'send-email',
queue: 'email',
data: { to: 'user@example.com' },
status: 'completed',
attempts: 0,
duration: 1250, // milliseconds
result: { messageId: 'abc123' }
}
}
4. Failed
Job processing failed
{
type: 'job',
payload: {
name: 'send-email',
queue: 'email',
data: { to: 'user@example.com' },
status: 'failed',
attempts: 1,
duration: 500,
error: 'SMTP connection failed'
}
}
5. Delayed
Job scheduled for later execution
{
type: 'job',
payload: {
name: 'send-reminder',
queue: 'notifications',
data: { userId: 123 },
status: 'delayed',
attempts: 0
}
}
Configuration
Job Watcher Config
interface JobWatcherConfig {
enabled?: boolean;
}
// In your NestLens config
NestLensModule.forRoot({
watchers: {
job: {
enabled: true,
},
},
})
Queue-Specific Settings
Bull/BullMQ settings still apply:
// Configure Bull queue
BullModule.registerQueue({
name: 'email',
redis: {
host: 'localhost',
port: 6379,
},
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
})
NestLens will track all configured queues.
Dashboard Features
Job Filtering
Filter jobs in the dashboard:
- Queue Name - Filter by specific queue
- Status - waiting, active, completed, failed, delayed
- Time Range - View jobs from specific periods
- Job Name - Filter by job type
Job Metrics
View important metrics:
- Processing Time - How long jobs take
- Failure Rate - Percentage of failed jobs
- Retry Count - Number of retry attempts
- Queue Length - Jobs waiting per queue
- Active Jobs - Currently processing jobs
Job Details
Click any job to view:
- Complete job data (payload)
- Result or error details
- Retry history
- Processing duration
- Queue name and configuration
Use Cases
1. Debug Failed Jobs
Identify why jobs are failing:
// In dashboard:
// 1. Filter by status: 'failed'
// 2. Filter by queue: 'email'
// 3. View error messages
// 4. Check job data for patterns
2. Monitor Job Performance
Track slow jobs:
// In dashboard:
// 1. Sort jobs by duration
// 2. Identify slow job types
// 3. Optimize processing logic
3. Track Retry Patterns
Understand retry behavior:
// In dashboard:
// 1. Filter by attempts > 1
// 2. Identify jobs that frequently retry
// 3. Adjust retry configuration
4. Queue Health Monitoring
Monitor queue status:
// In dashboard:
// 1. View jobs per queue
// 2. Check for stuck jobs (long active time)
// 3. Identify queue bottlenecks
Example Implementations
Email Queue
// email.processor.ts
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('email')
export class EmailProcessor {
@Process('send-email')
async handleSendEmail(job: Job) {
const { to, subject, body } = job.data;
// NestLens automatically tracks this
await this.emailService.send({ to, subject, body });
return { messageId: 'abc123', sent: true };
}
}
Notification Queue with Delays
// notification.service.ts
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
export class NotificationService {
constructor(
@InjectQueue('notifications') private queue: Queue
) {}
async scheduleReminder(userId: number, delayMs: number) {
// NestLens tracks this delayed job
await this.queue.add(
'send-reminder',
{ userId },
{ delay: delayMs }
);
}
}
Report Generation Queue
// report.processor.ts
@Processor('reports')
export class ReportProcessor {
@Process({ name: 'generate-report', concurrency: 2 })
async handleGenerateReport(job: Job) {
// Long-running job tracked by NestLens
const report = await this.reportService.generate(job.data);
// Update progress (also tracked)
job.progress(100);
return report;
}
}
Best Practices
1. Limit Job Data Size
Keep job payloads small:
// Good - minimal data
await queue.add('process-order', { orderId: 123 });
// Bad - large payload
await queue.add('process-order', {
orderId: 123,
fullOrderData: { /* huge object */ }
});
NestLens truncates data over 64KB, but smaller is better.
2. Use Descriptive Job Names
Make debugging easier:
// Good
await queue.add('send-welcome-email', data);
await queue.add('generate-monthly-report', data);
// Bad
await queue.add('job1', data);
await queue.add('task', data);
3. Handle Errors Gracefully
Provide error context:
@Process('send-email')
async handleSendEmail(job: Job) {
try {
await this.emailService.send(job.data);
} catch (error) {
// NestLens captures this error message
throw new Error(`Failed to send email to ${job.data.to}: ${error.message}`);
}
}
4. Monitor Queue Health
Set up alerts based on NestLens data:
- High failure rate → Alert team
- Long active duration → Possible stuck jobs
- Many delayed jobs → Queue backed up
5. Use Queue-Specific Prefixes
Organize jobs with naming conventions:
// Queue: email
'email:welcome'
'email:reset-password'
'email:notification'
// Queue: reports
'report:daily-sales'
'report:monthly-summary'
Troubleshooting
Jobs Not Appearing
Issue: Jobs not tracked in NestLens
Solutions:
-
Verify Job Watcher Enabled:
watchers: { job: true } -
Check Queue Registration:
// Make sure setupQueue was called
this.jobWatcher.setupQueue(this.queue, 'queue-name'); -
Verify Queue Injection:
// Ensure queue is properly injected
constructor(
@InjectQueue('email') private emailQueue: Queue,
private jobWatcher: JobWatcher,
) {} -
For BullMQ - Use setupBullMQQueue:
// Simplest approach - auto-creates QueueEvents
await this.jobWatcher.setupBullMQQueue(this.emailQueue, 'email');
Incomplete Job Data
Issue: Missing job data or results
Solutions:
- Data Size Limit - NestLens truncates data > 64KB
- Serialization Issues - Ensure job data is JSON-serializable
- Check Job Result - Verify processor returns data
Performance Impact
Issue: Job processing slower with NestLens
Solutions:
- Minimal Overhead - NestLens adds ~0.1ms per event
- Async Tracking - All logging is non-blocking
- Buffer System - Entries batched for efficiency
If concerned:
- Disable in production
- Use filtering to exclude specific queues
Performance Considerations
Job tracking overhead:
- Per Job Event: ~0.1-0.2ms
- Memory: Minimal (buffered)
- Redis Impact: None (no additional Redis calls)
Production recommendations:
- Monitor initial overhead
- Disable for high-volume queues if needed
- Use filtering to exclude verbose jobs
Next Steps
- Learn about Redis Integration
- Explore Custom Integrations
- Configure Advanced Filtering