Task Scheduling und Background Job Processing sind essentiell für moderne Webanwendungen, die zeitgesteuerte Aufgaben, asynchrone Verarbeitung und Load-Balancing benötigen. NestJS bietet umfassende Unterstützung für Cron Jobs, Queue-basierte Background Jobs und robuste Job Processing-Mechanismen. Dieses Kapitel zeigt, wie Sie skalierbare und zuverlässige Task-Processing-Systeme implementieren.
Das @nestjs/schedule Paket ermöglicht es, zeitgesteuerte
Aufgaben mit Cron-Syntax und anderen Scheduling-Optionen zu
implementieren.
import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { TasksService } from './tasks.service';
@Module({
imports: [
ScheduleModule.forRoot({
// Global Timezone Configuration
timezone: 'Europe/Berlin',
// Error Handler für failed Jobs
errorHandler: (error: Error, task: any) => {
console.error(`Scheduled task failed:`, {
error: error.message,
task: task.name,
timestamp: new Date(),
});
},
// Global Job Configuration
maxConcurrentJobs: 10,
// Graceful Shutdown
gracefulShutdown: true,
}),
],
providers: [TasksService],
})
export class TasksModule {}import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression, Interval, Timeout, SchedulerRegistry } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
@Injectable()
export class TasksService {
private readonly logger = new Logger(TasksService.name);
private taskMetrics = new Map<string, TaskMetrics>();
constructor(
@InjectRepository(User) private userRepository: Repository<User>,
@InjectRepository(Order) private orderRepository: Repository<Order>,
@InjectRepository(Report) private reportRepository: Repository<Report>,
private schedulerRegistry: SchedulerRegistry,
private emailService: EmailService,
private analyticsService: AnalyticsService,
) {}
// Basic Cron Jobs
@Cron('0 2 * * *', {
name: 'daily-cleanup',
timeZone: 'Europe/Berlin',
})
async handleDailyCleanup(): Promise<void> {
const taskName = 'daily-cleanup';
const startTime = Date.now();
try {
this.logger.log('Starting daily cleanup task');
// Clean up expired sessions
await this.cleanupExpiredSessions();
// Remove old temporary files
await this.cleanupTempFiles();
// Archive old logs
await this.archiveOldLogs();
// Update task metrics
this.updateTaskMetrics(taskName, true, Date.now() - startTime);
this.logger.log('Daily cleanup task completed successfully');
} catch (error) {
this.updateTaskMetrics(taskName, false, Date.now() - startTime, error);
this.logger.error('Daily cleanup task failed', error.message);
throw error;
}
}
@Cron(CronExpression.EVERY_HOUR, {
name: 'hourly-analytics',
})
async handleHourlyAnalytics(): Promise<void> {
const taskName = 'hourly-analytics';
const startTime = Date.now();
try {
this.logger.log('Starting hourly analytics task');
const now = new Date();
const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
// Aggregate user activity
const userMetrics = await this.aggregateUserActivity(oneHourAgo, now);
// Process order statistics
const orderMetrics = await this.processOrderStatistics(oneHourAgo, now);
// Generate performance reports
await this.generatePerformanceReport(userMetrics, orderMetrics, oneHourAgo);
this.updateTaskMetrics(taskName, true, Date.now() - startTime);
this.logger.log('Hourly analytics task completed');
} catch (error) {
this.updateTaskMetrics(taskName, false, Date.now() - startTime, error);
this.logger.error('Hourly analytics task failed', error.message);
}
}
// Weekly Report Generation
@Cron('0 9 * * 1', {
name: 'weekly-reports',
timeZone: 'Europe/Berlin',
})
async generateWeeklyReports(): Promise<void> {
const taskName = 'weekly-reports';
const startTime = Date.now();
try {
this.logger.log('Starting weekly report generation');
const now = new Date();
const oneWeekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
// Generate different types of reports
const reports = await Promise.all([
this.generateUserActivityReport(oneWeekAgo, now),
this.generateSalesReport(oneWeekAgo, now),
this.generatePerformanceReport(null, null, oneWeekAgo),
this.generateErrorReport(oneWeekAgo, now),
]);
// Send reports to stakeholders
await this.distributeReports(reports);
this.updateTaskMetrics(taskName, true, Date.now() - startTime);
this.logger.log('Weekly reports generated and distributed');
} catch (error) {
this.updateTaskMetrics(taskName, false, Date.now() - startTime, error);
this.logger.error('Weekly report generation failed', error.message);
}
}
// Monthly Maintenance
@Cron('0 3 1 * *', {
name: 'monthly-maintenance',
})
async performMonthlyMaintenance(): Promise<void> {
const taskName = 'monthly-maintenance';
const startTime = Date.now();
try {
this.logger.log('Starting monthly maintenance');
// Database maintenance
await this.performDatabaseMaintenance();
// Update user statistics
await this.updateUserStatistics();
// Clean old data
await this.archiveOldData();
// Optimize indexes
await this.optimizeDatabaseIndexes();
this.updateTaskMetrics(taskName, true, Date.now() - startTime);
this.logger.log('Monthly maintenance completed');
} catch (error) {
this.updateTaskMetrics(taskName, false, Date.now() - startTime, error);
this.logger.error('Monthly maintenance failed', error.message);
}
}
// Interval-based Tasks
@Interval('system-health-check', 30000) // Every 30 seconds
async performSystemHealthCheck(): Promise<void> {
try {
const healthMetrics = await this.collectSystemHealthMetrics();
// Check for critical issues
if (healthMetrics.criticalIssues.length > 0) {
await this.alertSystemAdmins(healthMetrics.criticalIssues);
}
// Update monitoring dashboard
await this.updateHealthDashboard(healthMetrics);
} catch (error) {
this.logger.error('System health check failed', error.message);
}
}
@Interval('cache-warm-up', 300000) // Every 5 minutes
async warmUpCache(): Promise<void> {
try {
// Preload frequently accessed data
await this.preloadPopularContent();
// Refresh cached statistics
await this.refreshCachedStats();
this.logger.debug('Cache warm-up completed');
} catch (error) {
this.logger.error('Cache warm-up failed', error.message);
}
}
// Timeout-based One-time Tasks
@Timeout('startup-tasks', 5000) // 5 seconds after startup
async handleStartupTasks(): Promise<void> {
try {
this.logger.log('Executing startup tasks');
// Initialize system components
await this.initializeSystemComponents();
// Validate configuration
await this.validateSystemConfiguration();
// Load initial data
await this.loadInitialData();
this.logger.log('Startup tasks completed');
} catch (error) {
this.logger.error('Startup tasks failed', error.message);
}
}
// Dynamic Job Management
async addDynamicJob(
name: string,
cronExpression: string,
handler: () => Promise<void>,
options?: {
timeZone?: string;
startDate?: Date;
endDate?: Date;
}
): Promise<void> {
const job = new CronJob(cronExpression, handler, null, true, options?.timeZone);
this.schedulerRegistry.addCronJob(name, job);
job.start();
this.logger.log(`Dynamic cron job '${name}' added with expression '${cronExpression}'`);
}
async removeDynamicJob(name: string): Promise<void> {
this.schedulerRegistry.deleteCronJob(name);
this.logger.log(`Dynamic cron job '${name}' removed`);
}
async addDynamicInterval(
name: string,
milliseconds: number,
handler: () => Promise<void>
): Promise<void> {
const interval = setInterval(handler, milliseconds);
this.schedulerRegistry.addInterval(name, interval);
this.logger.log(`Dynamic interval '${name}' added with ${milliseconds}ms interval`);
}
async removeDynamicInterval(name: string): Promise<void> {
this.schedulerRegistry.deleteInterval(name);
this.logger.log(`Dynamic interval '${name}' removed`);
}
// Task Implementation Methods
private async cleanupExpiredSessions(): Promise<void> {
const expiredSessions = await this.userRepository
.createQueryBuilder('user')
.where('user.sessionExpiry < :now', { now: new Date() })
.getMany();
for (const user of expiredSessions) {
user.sessionToken = null;
user.sessionExpiry = null;
await this.userRepository.save(user);
}
this.logger.log(`Cleaned up ${expiredSessions.length} expired sessions`);
}
private async cleanupTempFiles(): Promise<void> {
const fs = require('fs').promises;
const path = require('path');
const tempDir = path.join(process.cwd(), 'temp');
const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000;
try {
const files = await fs.readdir(tempDir);
let deletedCount = 0;
for (const file of files) {
const filePath = path.join(tempDir, file);
const stats = await fs.stat(filePath);
if (stats.mtimeMs < oneDayAgo) {
await fs.unlink(filePath);
deletedCount++;
}
}
this.logger.log(`Cleaned up ${deletedCount} temporary files`);
} catch (error) {
this.logger.warn('Temp file cleanup failed', error.message);
}
}
private async archiveOldLogs(): Promise<void> {
// Implementation for log archival
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);
// Archive logs older than 30 days
// This would typically involve moving logs to cold storage
this.logger.log('Archived old logs');
}
private async aggregateUserActivity(startTime: Date, endTime: Date): Promise<UserMetrics> {
const activeUsers = await this.userRepository
.createQueryBuilder('user')
.where('user.lastActivityAt BETWEEN :start AND :end', {
start: startTime,
end: endTime,
})
.getCount();
const newUsers = await this.userRepository
.createQueryBuilder('user')
.where('user.createdAt BETWEEN :start AND :end', {
start: startTime,
end: endTime,
})
.getCount();
return {
activeUsers,
newUsers,
period: { start: startTime, end: endTime },
};
}
private async processOrderStatistics(startTime: Date, endTime: Date): Promise<OrderMetrics> {
const orders = await this.orderRepository
.createQueryBuilder('order')
.where('order.createdAt BETWEEN :start AND :end', {
start: startTime,
end: endTime,
})
.getMany();
const totalRevenue = orders.reduce((sum, order) => sum + order.totalAmount, 0);
const averageOrderValue = orders.length > 0 ? totalRevenue / orders.length : 0;
return {
totalOrders: orders.length,
totalRevenue,
averageOrderValue,
period: { start: startTime, end: endTime },
};
}
private async generatePerformanceReport(
userMetrics: UserMetrics | null,
orderMetrics: OrderMetrics | null,
startTime: Date
): Promise<Report> {
const report = {
id: `perf_${Date.now()}`,
type: 'performance',
generatedAt: new Date(),
period: { start: startTime, end: new Date() },
data: {
userMetrics,
orderMetrics,
systemHealth: await this.collectSystemHealthMetrics(),
},
};
await this.reportRepository.save(report);
return report;
}
private async generateUserActivityReport(startTime: Date, endTime: Date): Promise<Report> {
const userMetrics = await this.aggregateUserActivity(startTime, endTime);
return {
id: `user_activity_${Date.now()}`,
type: 'user_activity',
generatedAt: new Date(),
period: { start: startTime, end: endTime },
data: userMetrics,
};
}
private async generateSalesReport(startTime: Date, endTime: Date): Promise<Report> {
const orderMetrics = await this.processOrderStatistics(startTime, endTime);
return {
id: `sales_${Date.now()}`,
type: 'sales',
generatedAt: new Date(),
period: { start: startTime, end: endTime },
data: orderMetrics,
};
}
private async generateErrorReport(startTime: Date, endTime: Date): Promise<Report> {
// Collect error statistics from logs or monitoring system
const errorStats = {
totalErrors: 150,
criticalErrors: 5,
warningCount: 45,
topErrors: [
{ message: 'Database connection timeout', count: 25 },
{ message: 'External API rate limit exceeded', count: 20 },
],
};
return {
id: `errors_${Date.now()}`,
type: 'errors',
generatedAt: new Date(),
period: { start: startTime, end: endTime },
data: errorStats,
};
}
private async distributeReports(reports: Report[]): Promise<void> {
// Send reports via email to stakeholders
const recipients = [
'admin@company.com',
'analytics@company.com',
'management@company.com',
];
for (const report of reports) {
await this.emailService.sendReport(recipients, report);
}
this.logger.log(`Distributed ${reports.length} reports to ${recipients.length} recipients`);
}
private async performDatabaseMaintenance(): Promise<void> {
// Run database maintenance tasks
// This would include VACUUM, ANALYZE, etc. for PostgreSQL
this.logger.log('Database maintenance completed');
}
private async updateUserStatistics(): Promise<void> {
// Update cached user statistics
const totalUsers = await this.userRepository.count();
const activeUsers = await this.userRepository.count({
where: { lastActivityAt: new Date() }, // Last 30 days
});
// Store in cache or statistics table
this.logger.log(`Updated user statistics: ${totalUsers} total, ${activeUsers} active`);
}
private async archiveOldData(): Promise<void> {
// Archive data older than retention period
const retentionDate = new Date();
retentionDate.setFullYear(retentionDate.getFullYear() - 2); // 2 years
// Archive old orders, logs, etc.
this.logger.log('Old data archived successfully');
}
private async optimizeDatabaseIndexes(): Promise<void> {
// Optimize database indexes
// This would be database-specific optimization
this.logger.log('Database indexes optimized');
}
private async collectSystemHealthMetrics(): Promise<SystemHealthMetrics> {
const memoryUsage = process.memoryUsage();
const cpuUsage = process.cpuUsage();
return {
timestamp: new Date(),
memory: {
heapUsed: memoryUsage.heapUsed,
heapTotal: memoryUsage.heapTotal,
rss: memoryUsage.rss,
},
cpu: {
user: cpuUsage.user,
system: cpuUsage.system,
},
uptime: process.uptime(),
criticalIssues: [], // Would be populated based on thresholds
};
}
private async alertSystemAdmins(issues: string[]): Promise<void> {
await this.emailService.sendAlert('System Critical Issues Detected', {
issues,
timestamp: new Date(),
});
}
private async updateHealthDashboard(metrics: SystemHealthMetrics): Promise<void> {
// Update real-time dashboard with health metrics
// This could push to WebSocket clients or update cache
}
private async preloadPopularContent(): Promise<void> {
// Preload frequently accessed data into cache
}
private async refreshCachedStats(): Promise<void> {
// Refresh cached statistics
}
private async initializeSystemComponents(): Promise<void> {
// Initialize system components at startup
}
private async validateSystemConfiguration(): Promise<void> {
// Validate system configuration
}
private async loadInitialData(): Promise<void> {
// Load initial data into cache
}
private updateTaskMetrics(
taskName: string,
success: boolean,
duration: number,
error?: Error
): void {
const existing = this.taskMetrics.get(taskName) || {
taskName,
executionCount: 0,
successCount: 0,
failureCount: 0,
totalDuration: 0,
averageDuration: 0,
lastExecution: null,
lastError: null,
};
existing.executionCount++;
existing.totalDuration += duration;
existing.averageDuration = existing.totalDuration / existing.executionCount;
existing.lastExecution = new Date();
if (success) {
existing.successCount++;
} else {
existing.failureCount++;
existing.lastError = error?.message || 'Unknown error';
}
this.taskMetrics.set(taskName, existing);
}
// Monitoring and Management Methods
getTaskMetrics(): TaskMetrics[] {
return Array.from(this.taskMetrics.values());
}
getTaskMetric(taskName: string): TaskMetrics | undefined {
return this.taskMetrics.get(taskName);
}
async getCronJobs(): Promise<Array<{ name: string; running: boolean; nextDate?: Date }>> {
const jobs = this.schedulerRegistry.getCronJobs();
const result = [];
jobs.forEach((job, name) => {
result.push({
name,
running: job.running,
nextDate: job.nextDate()?.toDate(),
});
});
return result;
}
async getIntervals(): Promise<Array<{ name: string }>> {
const intervals = this.schedulerRegistry.getIntervals();
return Array.from(intervals.keys()).map(name => ({ name }));
}
}
// Types and Interfaces
interface TaskMetrics {
taskName: string;
executionCount: number;
successCount: number;
failureCount: number;
totalDuration: number;
averageDuration: number;
lastExecution: Date | null;
lastError: string | null;
}
interface UserMetrics {
activeUsers: number;
newUsers: number;
period: { start: Date; end: Date };
}
interface OrderMetrics {
totalOrders: number;
totalRevenue: number;
averageOrderValue: number;
period: { start: Date; end: Date };
}
interface SystemHealthMetrics {
timestamp: Date;
memory: {
heapUsed: number;
heapTotal: number;
rss: number;
};
cpu: {
user: number;
system: number;
};
uptime: number;
criticalIssues: string[];
}
interface Report {
id: string;
type: string;
generatedAt: Date;
period: { start: Date; end: Date };
data: any;
}Bull Queue bietet robuste Background Job Processing mit Redis als Backend für skalierbare und zuverlässige Aufgabenverarbeitung.
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
// Global Bull Configuration
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
redis: {
host: configService.get('REDIS_HOST', 'localhost'),
port: configService.get('REDIS_PORT', 6379),
password: configService.get('REDIS_PASSWORD'),
db: configService.get('REDIS_DB', 0),
// Connection Configuration
maxRetriesPerRequest: 3,
retryDelayOnFailover: 100,
enableReadyCheck: false,
lazyConnect: true,
// Connection Pool
family: 4,
keepAlive: 30000,
},
// Global Queue Options
defaultJobOptions: {
removeOnComplete: 100, // Keep 100 completed jobs
removeOnFail: 50, // Keep 50 failed jobs
attempts: 3, // Default retry attempts
backoff: {
type: 'exponential',
delay: 2000,
},
},
// Advanced Configuration
settings: {
stalledInterval: 30000, // Check for stalled jobs every 30s
maxStalledCount: 1, // Max stalled job count
},
}),
inject: [ConfigService],
}),
// Queue Registration
BullModule.registerQueue(
{ name: 'email' },
{ name: 'image-processing' },
{ name: 'data-export' },
{ name: 'analytics' },
{ name: 'notification' },
{ name: 'cleanup' },
),
],
providers: [
EmailProcessor,
ImageProcessor,
DataExportProcessor,
AnalyticsProcessor,
NotificationProcessor,
CleanupProcessor,
JobService,
],
exports: [BullModule],
})
export class QueueModule {}import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue, JobOptions } from 'bull';
@Injectable()
export class JobService {
constructor(
@InjectQueue('email') private emailQueue: Queue,
@InjectQueue('image-processing') private imageQueue: Queue,
@InjectQueue('data-export') private exportQueue: Queue,
@InjectQueue('analytics') private analyticsQueue: Queue,
@InjectQueue('notification') private notificationQueue: Queue,
@InjectQueue('cleanup') private cleanupQueue: Queue,
) {}
// Email Jobs
async sendWelcomeEmail(userId: string, email: string): Promise<void> {
await this.emailQueue.add('welcome-email', {
userId,
email,
timestamp: new Date(),
}, {
priority: 5, // High priority
delay: 1000, // 1 second delay
attempts: 3,
});
}
async sendPasswordResetEmail(email: string, resetToken: string): Promise<void> {
await this.emailQueue.add('password-reset', {
email,
resetToken,
expiresAt: new Date(Date.now() + 60 * 60 * 1000), // 1 hour
}, {
priority: 10, // Very high priority
attempts: 5,
backoff: {
type: 'fixed',
delay: 5000,
},
});
}
async sendBulkEmails(recipients: string[], template: string, data: any): Promise<void> {
// Create bulk jobs
const jobs = recipients.map(email => ({
name: 'bulk-email',
data: { email, template, data },
opts: {
priority: 1, // Low priority for bulk operations
attempts: 2,
},
}));
await this.emailQueue.addBulk(jobs);
}
// Image Processing Jobs
async processImageUpload(
imageId: string,
filePath: string,
sizes: string[]
): Promise<void> {
await this.imageQueue.add('resize-images', {
imageId,
filePath,
sizes,
}, {
attempts: 3,
timeout: 300000, // 5 minutes timeout
});
}
async generateImageThumbnails(imageIds: string[]): Promise<void> {
const jobs = imageIds.map(imageId => ({
name: 'generate-thumbnail',
data: { imageId },
}));
await this.imageQueue.addBulk(jobs);
}
// Data Export Jobs
async exportUserData(
userId: string,
format: 'csv' | 'json' | 'xlsx',
filters?: any
): Promise<string> {
const job = await this.exportQueue.add('user-data-export', {
userId,
format,
filters,
requestedAt: new Date(),
}, {
attempts: 2,
timeout: 600000, // 10 minutes timeout
});
return job.id.toString();
}
async scheduleWeeklyReport(reportType: string, recipients: string[]): Promise<void> {
await this.analyticsQueue.add('weekly-report', {
reportType,
recipients,
}, {
repeat: { cron: '0 9 * * 1' }, // Every Monday at 9 AM
attempts: 3,
});
}
// Notification Jobs
async scheduleNotification(
userId: string,
message: string,
scheduledFor: Date
): Promise<void> {
const delay = scheduledFor.getTime() - Date.now();
await this.notificationQueue.add('scheduled-notification', {
userId,
message,
scheduledFor,
}, {
delay: delay > 0 ? delay : 0,
attempts: 3,
});
}
async sendPushNotifications(
userIds: string[],
title: string,
body: string,
data?: any
): Promise<void> {
// Batch notifications for efficiency
const batchSize = 100;
for (let i = 0; i < userIds.length; i += batchSize) {
const batch = userIds.slice(i, i + batchSize);
await this.notificationQueue.add('push-notification-batch', {
userIds: batch,
title,
body,
data,
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
});
}
}
// Cleanup Jobs
async scheduleCleanupTask(
type: 'temp-files' | 'old-logs' | 'expired-sessions',
options?: any
): Promise<void> {
await this.cleanupQueue.add(`cleanup-${type}`, {
type,
options,
scheduledAt: new Date(),
}, {
attempts: 2,
timeout: 300000, // 5 minutes
});
}
// Recurring Jobs
async setupRecurringJobs(): Promise<void> {
// Daily cleanup
await this.cleanupQueue.add('daily-cleanup', {}, {
repeat: { cron: '0 2 * * *' }, // 2 AM daily
attempts: 2,
});
// Hourly analytics
await this.analyticsQueue.add('hourly-stats', {}, {
repeat: { cron: '0 * * * *' }, // Every hour
attempts: 3,
});
// Weekly reports
await this.analyticsQueue.add('weekly-reports', {}, {
repeat: { cron: '0 9 * * 1' }, // Monday 9 AM
attempts: 3,
});
}
// Job Management
async getJobStatus(queueName: string, jobId: string): Promise<any> {
const queue = this.getQueue(queueName);
const job = await queue.getJob(jobId);
if (!job) {
return null;
}
return {
id: job.id,
name: job.name,
data: job.data,
progress: job.progress(),
state: await job.getState(),
createdAt: new Date(job.timestamp),
processedAt: job.processedOn ? new Date(job.processedOn) : null,
finishedAt: job.finishedOn ? new Date(job.finishedOn) : null,
failedReason: job.failedReason,
attempts: job.attemptsMade,
delay: job.delay,
};
}
async cancelJob(queueName: string, jobId: string): Promise<boolean> {
const queue = this.getQueue(queueName);
const job = await queue.getJob(jobId);
if (job) {
await job.remove();
return true;
}
return false;
}
async retryJob(queueName: string, jobId: string): Promise<void> {
const queue = this.getQueue(queueName);
const job = await queue.getJob(jobId);
if (job) {
await job.retry();
}
}
async pauseQueue(queueName: string): Promise<void> {
const queue = this.getQueue(queueName);
await queue.pause();
}
async resumeQueue(queueName: string): Promise<void> {
const queue = this.getQueue(queueName);
await queue.resume();
}
private getQueue(queueName: string): Queue {
switch (queueName) {
case 'email': return this.emailQueue;
case 'image-processing': return this.imageQueue;
case 'data-export': return this.exportQueue;
case 'analytics': return this.analyticsQueue;
case 'notification': return this.notificationQueue;
case 'cleanup': return this.cleanupQueue;
default: throw new Error(`Unknown queue: ${queueName}`);
}
}
// Queue Statistics
async getQueueStats(queueName: string): Promise<QueueStats> {
const queue = this.getQueue(queueName);
const waiting = await queue.getWaiting();
const active = await queue.getActive();
const completed = await queue.getCompleted();
const failed = await queue.getFailed();
const delayed = await queue.getDelayed();
return {
queueName,
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
isPaused: await queue.isPaused(),
};
}
async getAllQueueStats(): Promise<QueueStats[]> {
const queueNames = ['email', 'image-processing', 'data-export', 'analytics', 'notification', 'cleanup'];
return await Promise.all(
queueNames.map(name => this.getQueueStats(name))
);
}
}
interface QueueStats {
queueName: string;
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
isPaused: boolean;
}Job Processors definieren, wie verschiedene Arten von Jobs verarbeitet werden.
import { Process, Processor, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';
// Email Processor
@Processor('email')
export class EmailProcessor {
private readonly logger = new Logger(EmailProcessor.name);
constructor(
private emailService: EmailService,
private userService: UserService,
) {}
@Process('welcome-email')
async handleWelcomeEmail(job: Job): Promise<void> {
const { userId, email } = job.data;
this.logger.log(`Processing welcome email for user: ${userId}`);
try {
const user = await this.userService.findById(userId);
if (!user) {
throw new Error(`User not found: ${userId}`);
}
await this.emailService.sendWelcomeEmail(email, {
name: user.name,
verificationLink: `${process.env.APP_URL}/verify?token=${user.verificationToken}`,
});
this.logger.log(`Welcome email sent successfully to: ${email}`);
} catch (error) {
this.logger.error(`Failed to send welcome email: ${error.message}`);
throw error;
}
}
@Process('password-reset')
async handlePasswordReset(job: Job): Promise<void> {
const { email, resetToken, expiresAt } = job.data;
if (new Date() > new Date(expiresAt)) {
throw new Error('Reset token has expired');
}
await this.emailService.sendPasswordResetEmail(email, {
resetLink: `${process.env.APP_URL}/reset-password?token=${resetToken}`,
expiresAt: new Date(expiresAt),
});
this.logger.log(`Password reset email sent to: ${email}`);
}
@Process('bulk-email')
async handleBulkEmail(job: Job): Promise<void> {
const { email, template, data } = job.data;
// Add small delay to prevent overwhelming email service
await new Promise(resolve => setTimeout(resolve, 100));
await this.emailService.sendTemplatedEmail(email, template, data);
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(`Processing email job ${job.id} of type ${job.name}`);
}
@OnQueueCompleted()
onCompleted(job: Job, result: any) {
this.logger.log(`Email job ${job.id} completed successfully`);
}
@OnQueueFailed()
onFailed(job: Job, err: Error) {
this.logger.error(`Email job ${job.id} failed: ${err.message}`);
}
}
// Image Processing Processor
@Processor('image-processing')
export class ImageProcessor {
private readonly logger = new Logger(ImageProcessor.name);
constructor(
private imageService: ImageService,
private storageService: StorageService,
) {}
@Process('resize-images')
async handleImageResize(job: Job): Promise<void> {
const { imageId, filePath, sizes } = job.data;
this.logger.log(`Processing image resize for: ${imageId}`);
try {
// Update progress
await job.progress(10);
const image = await this.imageService.findById(imageId);
if (!image) {
throw new Error(`Image not found: ${imageId}`);
}
const resizedImages = [];
const totalSizes = sizes.length;
for (let i = 0; i < sizes.length; i++) {
const size = sizes[i];
// Update progress
await job.progress(10 + (i / totalSizes) * 80);
const resizedPath = await this.imageService.resizeImage(filePath, size);
const uploadedUrl = await this.storageService.uploadFile(resizedPath, {
folder: 'resized',
public: true,
});
resizedImages.push({
size,
url: uploadedUrl,
path: resizedPath,
});
}
// Update image record
await this.imageService.updateResizedVersions(imageId, resizedImages);
// Final progress update
await job.progress(100);
this.logger.log(`Image resize completed for: ${imageId}`);
} catch (error) {
this.logger.error(`Image resize failed for ${imageId}: ${error.message}`);
throw error;
}
}
@Process('generate-thumbnail')
async handleThumbnailGeneration(job: Job): Promise<void> {
const { imageId } = job.data;
try {
const image = await this.imageService.findById(imageId);
if (!image) {
throw new Error(`Image not found: ${imageId}`);
}
const thumbnailPath = await this.imageService.generateThumbnail(image.filePath);
const thumbnailUrl = await this.storageService.uploadFile(thumbnailPath, {
folder: 'thumbnails',
public: true,
});
await this.imageService.updateThumbnail(imageId, thumbnailUrl);
this.logger.log(`Thumbnail generated for image: ${imageId}`);
} catch (error) {
this.logger.error(`Thumbnail generation failed: ${error.message}`);
throw error;
}
}
}
// Data Export Processor
@Processor('data-export')
export class DataExportProcessor {
private readonly logger = new Logger(DataExportProcessor.name);
constructor(
private userService: UserService,
private exportService: ExportService,
private storageService: StorageService,
private emailService: EmailService,
) {}
@Process('user-data-export')
async handleUserDataExport(job: Job): Promise<void> {
const { userId, format, filters } = job.data;
this.logger.log(`Starting data export for user: ${userId}`);
try {
await job.progress(10);
// Fetch user data
const userData = await this.userService.getExportData(userId, filters);
await job.progress(40);
// Generate export file
const exportFile = await this.exportService.generateExport(userData, format);
await job.progress(70);
// Upload to storage
const fileUrl = await this.storageService.uploadFile(exportFile.path, {
folder: 'exports',
private: true,
expiresIn: 24 * 60 * 60, // 24 hours
});
await job.progress(90);
// Notify user
const user = await this.userService.findById(userId);
await this.emailService.sendDataExportReady(user.email, {
downloadUrl: fileUrl,
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000),
format,
});
await job.progress(100);
this.logger.log(`Data export completed for user: ${userId}`);
} catch (error) {
this.logger.error(`Data export failed for user ${userId}: ${error.message}`);
throw error;
}
}
}
// Analytics Processor
@Processor('analytics')
export class AnalyticsProcessor {
private readonly logger = new Logger(AnalyticsProcessor.name);
constructor(
private analyticsService: AnalyticsService,
private reportService: ReportService,
private emailService: EmailService,
) {}
@Process('hourly-stats')
async handleHourlyStats(job: Job): Promise<void> {
this.logger.log('Processing hourly statistics');
const now = new Date();
const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
try {
// Collect metrics
const metrics = await this.analyticsService.collectHourlyMetrics(oneHourAgo, now);
// Store in time series database
await this.analyticsService.storeMetrics(metrics);
// Check for anomalies
const anomalies = await this.analyticsService.detectAnomalies(metrics);
if (anomalies.length > 0) {
await this.emailService.sendAnomalyAlert(anomalies);
}
this.logger.log('Hourly statistics processed successfully');
} catch (error) {
this.logger.error(`Hourly stats processing failed: ${error.message}`);
throw error;
}
}
@Process('weekly-report')
async handleWeeklyReport(job: Job): Promise<void> {
const { reportType, recipients } = job.data;
this.logger.log(`Generating weekly report: ${reportType}`);
try {
const endDate = new Date();
const startDate = new Date(endDate.getTime() - 7 * 24 * 60 * 60 * 1000);
const report = await this.reportService.generateWeeklyReport(
reportType,
startDate,
endDate
);
await this.emailService.sendWeeklyReport(recipients, report);
this.logger.log(`Weekly report sent to ${recipients.length} recipients`);
} catch (error) {
this.logger.error(`Weekly report generation failed: ${error.message}`);
throw error;
}
}
}
// Notification Processor
@Processor('notification')
export class NotificationProcessor {
private readonly logger = new Logger(NotificationProcessor.name);
constructor(
private notificationService: NotificationService,
private pushService: PushService,
) {}
@Process('scheduled-notification')
async handleScheduledNotification(job: Job): Promise<void> {
const { userId, message, scheduledFor } = job.data;
// Double-check if notification should still be sent
if (new Date() < new Date(scheduledFor)) {
this.logger.warn(`Notification for user ${userId} executed before scheduled time`);
}
await this.notificationService.sendNotification(userId, message);
this.logger.log(`Scheduled notification sent to user: ${userId}`);
}
@Process('push-notification-batch')
async handlePushNotificationBatch(job: Job): Promise<void> {
const { userIds, title, body, data } = job.data;
this.logger.log(`Sending push notifications to ${userIds.length} users`);
try {
const results = await this.pushService.sendBatchNotifications(userIds, {
title,
body,
data,
});
const successCount = results.filter(r => r.success).length;
const failureCount = results.length - successCount;
this.logger.log(`Push notifications sent: ${successCount} success, ${failureCount} failed`);
} catch (error) {
this.logger.error(`Batch push notification failed: ${error.message}`);
throw error;
}
}
}
// Cleanup Processor
@Processor('cleanup')
export class CleanupProcessor {
private readonly logger = new Logger(CleanupProcessor.name);
constructor(
private cleanupService: CleanupService,
) {}
@Process('cleanup-temp-files')
async handleTempFileCleanup(job: Job): Promise<void> {
this.logger.log('Starting temporary file cleanup');
try {
const deletedCount = await this.cleanupService.cleanupTempFiles();
this.logger.log(`Cleaned up ${deletedCount} temporary files`);
} catch (error) {
this.logger.error(`Temp file cleanup failed: ${error.message}`);
throw error;
}
}
@Process('cleanup-old-logs')
async handleLogCleanup(job: Job): Promise<void> {
this.logger.log('Starting old log cleanup');
try {
const archivedCount = await this.cleanupService.archiveOldLogs();
this.logger.log(`Archived ${archivedCount} old log files`);
} catch (error) {
this.logger.error(`Log cleanup failed: ${error.message}`);
throw error;
}
}
@Process('cleanup-expired-sessions')
async handleSessionCleanup(job: Job): Promise<void> {
this.logger.log('Starting expired session cleanup');
try {
const clearedCount = await this.cleanupService.clearExpiredSessions();
this.logger.log(`Cleared ${clearedCount} expired sessions`);
} catch (error) {
this.logger.error(`Session cleanup failed: ${error.message}`);
throw error;
}
}
@Process('daily-cleanup')
async handleDailyCleanup(job: Job): Promise<void> {
this.logger.log('Starting daily cleanup tasks');
try {
await job.progress(20);
await this.cleanupService.cleanupTempFiles();
await job.progress(40);
await this.cleanupService.clearExpiredSessions();
await job.progress(60);
await this.cleanupService.archiveOldLogs();
await job.progress(80);
await this.cleanupService.optimizeDatabase();
await job.progress(100);
this.logger.log('Daily cleanup completed successfully');
} catch (error) {
this.logger.error(`Daily cleanup failed: ${error.message}`);
throw error;
}
}
}Robuste Retry-Mechanismen sind essentiell für zuverlässige Background Job Processing.
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue, JobOptions } from 'bull';
@Injectable()
export class RetryService {
constructor(
@InjectQueue('email') private emailQueue: Queue,
@InjectQueue('critical') private criticalQueue: Queue,
) {}
// Exponential Backoff Retry
async addJobWithExponentialBackoff(
queueName: string,
jobName: string,
data: any,
maxAttempts: number = 5
): Promise<void> {
const queue = this.getQueue(queueName);
await queue.add(jobName, data, {
attempts: maxAttempts,
backoff: {
type: 'exponential',
delay: 2000, // Start with 2 seconds
},
removeOnComplete: 100,
removeOnFail: 50,
});
}
// Fixed Delay Retry
async addJobWithFixedDelay(
queueName: string,
jobName: string,
data: any,
attempts: number = 3,
delay: number = 5000
): Promise<void> {
const queue = this.getQueue(queueName);
await queue.add(jobName, data, {
attempts,
backoff: {
type: 'fixed',
delay,
},
});
}
// Custom Retry Strategy
async addJobWithCustomRetry(
queueName: string,
jobName: string,
data: any,
retryConfig: CustomRetryConfig
): Promise<void> {
const queue = this.getQueue(queueName);
await queue.add(jobName, data, {
attempts: retryConfig.maxAttempts,
backoff: {
type: 'custom',
},
});
}
// Conditional Retry based on Error Type
async addJobWithConditionalRetry(
queueName: string,
jobName: string,
data: any
): Promise<void> {
const queue = this.getQueue(queueName);
await queue.add(jobName, data, {
attempts: 5,
backoff: (attemptsMade: number, err: Error) => {
// Different retry strategies based on error type
if (err.message.includes('RATE_LIMIT')) {
return 60000; // 1 minute for rate limit errors
}
if (err.message.includes('NETWORK_ERROR')) {
return Math.pow(2, attemptsMade) * 1000; // Exponential for network errors
}
if (err.message.includes('AUTH_ERROR')) {
return -1; // Don't retry auth errors
}
// Default exponential backoff
return Math.pow(2, attemptsMade) * 2000;
},
});
}
private getQueue(queueName: string): Queue {
switch (queueName) {
case 'email': return this.emailQueue;
case 'critical': return this.criticalQueue;
default: throw new Error(`Unknown queue: ${queueName}`);
}
}
}
interface CustomRetryConfig {
maxAttempts: number;
delays: number[];
retryableErrors: string[];
nonRetryableErrors: string[];
}
// Enhanced Job Processor with Retry Logic
@Processor('enhanced-processor')
export class EnhancedJobProcessor {
private readonly logger = new Logger(EnhancedJobProcessor.name);
@Process('critical-operation')
async handleCriticalOperation(job: Job): Promise<void> {
const { operation, data } = job.data;
try {
await this.performOperation(operation, data);
} catch (error) {
// Enhanced error handling with retry decisions
await this.handleJobError(job, error);
throw error; // Re-throw to trigger Bull's retry mechanism
}
}
private async handleJobError(job: Job, error: Error): Promise<void> {
const attemptsMade = job.attemptsMade;
const maxAttempts = job.opts.attempts || 3;
// Log detailed error information
this.logger.error(`Job ${job.id} failed (attempt ${attemptsMade}/${maxAttempts})`, {
jobId: job.id,
jobName: job.name,
error: error.message,
stack: error.stack,
attemptsMade,
maxAttempts,
data: job.data,
});
// Check if this is the final attempt
if (attemptsMade >= maxAttempts) {
await this.handleFinalFailure(job, error);
}
// Determine if error is retryable
if (this.isNonRetryableError(error)) {
await this.moveToDeadLetterQueue(job, error);
return;
}
// Log retry information
const nextDelay = this.calculateNextDelay(attemptsMade, error);
this.logger.warn(`Job ${job.id} will be retried in ${nextDelay}ms`);
}
private async handleFinalFailure(job: Job, error: Error): Promise<void> {
// Send alert for critical job failures
if (job.data.critical) {
await this.alertAdministrators(job, error);
}
// Store failure information for analysis
await this.logJobFailure(job, error);
// Attempt compensation if possible
if (job.data.compensationAction) {
await this.executeCompensation(job.data.compensationAction);
}
}
private isNonRetryableError(error: Error): boolean {
const nonRetryablePatterns = [
'VALIDATION_ERROR',
'AUTH_ERROR',
'PERMISSION_DENIED',
'RESOURCE_NOT_FOUND',
'INVALID_INPUT',
];
return nonRetryablePatterns.some(pattern =>
error.message.includes(pattern)
);
}
private calculateNextDelay(attemptsMade: number, error: Error): number {
// Custom delay calculation based on error type
if (error.message.includes('RATE_LIMIT')) {
return 60000; // 1 minute for rate limits
}
if (error.message.includes('SERVICE_UNAVAILABLE')) {
return Math.pow(2, attemptsMade) * 5000; // Exponential with 5s base
}
// Default exponential backoff
return Math.pow(2, attemptsMade) * 2000;
}
private async moveToDeadLetterQueue(job: Job, error: Error): Promise<void> {
// Move job to dead letter queue for manual inspection
const deadLetterData = {
originalJob: {
id: job.id,
name: job.name,
data: job.data,
opts: job.opts,
},
error: {
message: error.message,
stack: error.stack,
},
failedAt: new Date(),
attemptsMade: job.attemptsMade,
};
// Add to dead letter queue
await this.deadLetterQueue.add('dead-letter', deadLetterData);
this.logger.error(`Job ${job.id} moved to dead letter queue due to non-retryable error`);
}
private async alertAdministrators(job: Job, error: Error): Promise<void> {
// Send critical job failure alert
await this.notificationService.sendCriticalAlert({
subject: 'Critical Job Failure',
jobId: job.id,
jobName: job.name,
error: error.message,
timestamp: new Date(),
});
}
private async logJobFailure(job: Job, error: Error): Promise<void> {
// Store detailed failure information for analysis
await this.failureLogService.log({
jobId: job.id,
jobName: job.name,
jobData: job.data,
error: error.message,
stack: error.stack,
attemptsMade: job.attemptsMade,
failedAt: new Date(),
});
}
private async executeCompensation(compensationAction: any): Promise<void> {
try {
// Execute compensation logic
await this.compensationService.execute(compensationAction);
this.logger.log(`Compensation executed for failed job`);
} catch (compensationError) {
this.logger.error(`Compensation failed: ${compensationError.message}`);
}
}
private async performOperation(operation: string, data: any): Promise<void> {
// Simulate different types of operations that might fail
switch (operation) {
case 'api-call':
await this.makeApiCall(data);
break;
case 'database-operation':
await this.performDatabaseOperation(data);
break;
case 'file-processing':
await this.processFile(data);
break;
default:
throw new Error(`Unknown operation: ${operation}`);
}
}
private async makeApiCall(data: any): Promise<void> {
// Simulate API call that might fail
if (Math.random() < 0.3) {
throw new Error('NETWORK_ERROR: Failed to connect to external API');
}
}
private async performDatabaseOperation(data: any): Promise<void> {
// Simulate database operation that might fail
if (Math.random() < 0.2) {
throw new Error('DATABASE_ERROR: Connection timeout');
}
}
private async processFile(data: any): Promise<void> {
// Simulate file processing that might fail
if (Math.random() < 0.1) {
throw new Error('FILE_ERROR: Invalid file format');
}
}
}
// Circuit Breaker for Job Processing
@Injectable()
export class JobCircuitBreaker {
private circuits = new Map<string, CircuitState>();
async executeWithCircuitBreaker<T>(
serviceName: string,
operation: () => Promise<T>,
options: CircuitBreakerOptions = {}
): Promise<T> {
const circuit = this.getOrCreateCircuit(serviceName, options);
if (circuit.state === 'OPEN') {
if (Date.now() < circuit.nextRetryTime) {
throw new Error(`Circuit breaker OPEN for ${serviceName}`);
} else {
circuit.state = 'HALF_OPEN';
}
}
try {
const result = await operation();
this.recordSuccess(serviceName);
return result;
} catch (error) {
this.recordFailure(serviceName, error);
throw error;
}
}
private getOrCreateCircuit(serviceName: string, options: CircuitBreakerOptions): CircuitState {
if (!this.circuits.has(serviceName)) {
this.circuits.set(serviceName, {
state: 'CLOSED',
failureCount: 0,
successCount: 0,
lastFailureTime: 0,
nextRetryTime: 0,
options: {
failureThreshold: 5,
recoveryTimeout: 60000,
...options,
},
});
}
return this.circuits.get(serviceName)!;
}
private recordSuccess(serviceName: string): void {
const circuit = this.circuits.get(serviceName)!;
circuit.successCount++;
circuit.failureCount = 0;
if (circuit.state === 'HALF_OPEN') {
circuit.state = 'CLOSED';
}
}
private recordFailure(serviceName: string, error: Error): void {
const circuit = this.circuits.get(serviceName)!;
circuit.failureCount++;
circuit.lastFailureTime = Date.now();
if (circuit.failureCount >= circuit.options.failureThreshold) {
circuit.state = 'OPEN';
circuit.nextRetryTime = Date.now() + circuit.options.recoveryTimeout;
}
}
}
interface CircuitState {
state: 'CLOSED' | 'OPEN' | 'HALF_OPEN';
failureCount: number;
successCount: number;
lastFailureTime: number;
nextRetryTime: number;
options: CircuitBreakerOptions;
}
interface CircuitBreakerOptions {
failureThreshold?: number;
recoveryTimeout?: number;
}Umfassendes Monitoring ist entscheidend für die Verwaltung und Optimierung von Queue-Systemen.
import { Controller, Get, Post, Delete, Param, Query, Body } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
// Queue Monitoring Service
@Injectable()
export class QueueMonitoringService {
constructor(
private jobService: JobService,
private metricsService: MetricsService,
) {}
async getComprehensiveQueueStatus(): Promise<QueueOverview> {
const allStats = await this.jobService.getAllQueueStats();
const systemMetrics = await this.getSystemMetrics();
const alerts = await this.checkForAlerts();
return {
timestamp: new Date(),
queues: allStats,
systemMetrics,
alerts,
summary: this.calculateSummary(allStats),
};
}
async getDetailedQueueInfo(queueName: string): Promise<DetailedQueueInfo> {
const stats = await this.jobService.getQueueStats(queueName);
const recentJobs = await this.getRecentJobs(queueName, 50);
const performance = await this.getQueuePerformanceMetrics(queueName);
const healthScore = await this.calculateQueueHealthScore(queueName);
return {
queueName,
stats,
recentJobs,
performance,
healthScore,
recommendations: await this.getQueueRecommendations(queueName, stats, performance),
};
}
async getJobHistory(
queueName: string,
timeRange: { start: Date; end: Date },
status?: string
): Promise<JobHistoryData> {
const jobs = await this.jobService.getJobHistory(queueName, timeRange, status);
return {
jobs,
statistics: {
totalJobs: jobs.length,
successRate: this.calculateSuccessRate(jobs),
averageProcessingTime: this.calculateAverageProcessingTime(jobs),
peakHours: this.identifyPeakHours(jobs),
},
};
}
private async getSystemMetrics(): Promise<SystemMetrics> {
return {
memory: process.memoryUsage(),
cpu: process.cpuUsage(),
uptime: process.uptime(),
redisConnections: await this.getRedisConnectionCount(),
activeWorkers: await this.getActiveWorkerCount(),
};
}
private async checkForAlerts(): Promise<Alert[]> {
const alerts: Alert[] = [];
const allStats = await this.jobService.getAllQueueStats();
for (const queue of allStats) {
// High failure rate alert
const failureRate = queue.failed / (queue.completed + queue.failed || 1);
if (failureRate > 0.1) { // > 10% failure rate
alerts.push({
type: 'HIGH_FAILURE_RATE',
severity: 'HIGH',
queue: queue.queueName,
message: `High failure rate detected: ${(failureRate * 100).toFixed(1)}%`,
timestamp: new Date(),
});
}
// Queue backlog alert
if (queue.waiting > 1000) {
alerts.push({
type: 'HIGH_BACKLOG',
severity: 'MEDIUM',
queue: queue.queueName,
message: `Large queue backlog: ${queue.waiting} jobs waiting`,
timestamp: new Date(),
});
}
// Stalled jobs alert
if (queue.active > 50) {
alerts.push({
type: 'MANY_ACTIVE_JOBS',
severity: 'LOW',
queue: queue.queueName,
message: `Many active jobs: ${queue.active} currently processing`,
timestamp: new Date(),
});
}
}
return alerts;
}
private calculateSummary(allStats: QueueStats[]): QueueSummary {
return {
totalQueues: allStats.length,
totalWaiting: allStats.reduce((sum, q) => sum + q.waiting, 0),
totalActive: allStats.reduce((sum, q) => sum + q.active, 0),
totalCompleted: allStats.reduce((sum, q) => sum + q.completed, 0),
totalFailed: allStats.reduce((sum, q) => sum + q.failed, 0),
overallHealthScore: this.calculateOverallHealthScore(allStats),
};
}
private async getRecentJobs(queueName: string, limit: number): Promise<JobInfo[]> {
// Implementation would fetch recent jobs from the queue
return [];
}
private async getQueuePerformanceMetrics(queueName: string): Promise<PerformanceMetrics> {
const last24Hours = new Date(Date.now() - 24 * 60 * 60 * 1000);
return {
averageProcessingTime: await this.metricsService.getAverageProcessingTime(queueName, last24Hours),
throughput: await this.metricsService.getThroughput(queueName, last24Hours),
errorRate: await this.metricsService.getErrorRate(queueName, last24Hours),
trends: await this.metricsService.getTrends(queueName, last24Hours),
};
}
private async calculateQueueHealthScore(queueName: string): Promise<number> {
const performance = await this.getQueuePerformanceMetrics(queueName);
const stats = await this.jobService.getQueueStats(queueName);
let score = 100;
// Reduce score based on error rate
score -= performance.errorRate * 50;
// Reduce score based on queue backlog
const backlogRatio = stats.waiting / (stats.waiting + stats.active + 1);
score -= backlogRatio * 30;
// Reduce score based on processing time
if (performance.averageProcessingTime > 30000) { // > 30 seconds
score -= 20;
}
return Math.max(0, Math.min(100, score));
}
private async getQueueRecommendations(
queueName: string,
stats: QueueStats,
performance: PerformanceMetrics
): Promise<string[]> {
const recommendations: string[] = [];
if (stats.waiting > 100) {
recommendations.push('Consider adding more workers to reduce queue backlog');
}
if (performance.errorRate > 0.05) {
recommendations.push('Investigate and fix common failure causes');
}
if (performance.averageProcessingTime > 60000) {
recommendations.push('Optimize job processing logic to reduce execution time');
}
if (stats.failed > 50) {
recommendations.push('Review failed jobs and implement better error handling');
}
return recommendations;
}
private calculateSuccessRate(jobs: JobInfo[]): number {
if (jobs.length === 0) return 0;
const successful = jobs.filter(job => job.state === 'completed').length;
return (successful / jobs.length) * 100;
}
private calculateAverageProcessingTime(jobs: JobInfo[]): number {
const completedJobs = jobs.filter(job => job.state === 'completed' && job.processedAt && job.createdAt);
if (completedJobs.length === 0) return 0;
const totalTime = completedJobs.reduce((sum, job) => {
return sum + (job.processedAt!.getTime() - job.createdAt.getTime());
}, 0);
return totalTime / completedJobs.length;
}
private identifyPeakHours(jobs: JobInfo[]): number[] {
const hourCounts = new Array(24).fill(0);
jobs.forEach(job => {
const hour = job.createdAt.getHours();
hourCounts[hour]++;
});
return hourCounts;
}
private calculateOverallHealthScore(allStats: QueueStats[]): number {
if (allStats.length === 0) return 100;
const totalJobs = allStats.reduce((sum, q) => sum + q.completed + q.failed, 0);
if (totalJobs === 0) return 100;
const totalFailed = allStats.reduce((sum, q) => sum + q.failed, 0);
const errorRate = totalFailed / totalJobs;
return Math.max(0, 100 - (errorRate * 100));
}
private async getRedisConnectionCount(): Promise<number> {
// Implementation would check Redis connection count
return 5;
}
private async getActiveWorkerCount(): Promise<number> {
// Implementation would check active worker count
return 10;
}
}
// Monitoring Controller
@Controller('admin/queues')
export class QueueMonitoringController {
constructor(
private queueMonitoringService: QueueMonitoringService,
private jobService: JobService,
) {}
@Get('overview')
async getQueueOverview(): Promise<QueueOverview> {
return await this.queueMonitoringService.getComprehensiveQueueStatus();
}
@Get(':queueName')
async getQueueDetails(@Param('queueName') queueName: string): Promise<DetailedQueueInfo> {
return await this.queueMonitoringService.getDetailedQueueInfo(queueName);
}
@Get(':queueName/jobs')
async getQueueJobs(
@Param('queueName') queueName: string,
@Query('status') status?: string,
@Query('limit') limit: number = 50,
@Query('offset') offset: number = 0
): Promise<JobInfo[]> {
return await this.jobService.getQueueJobs(queueName, {
status,
limit,
offset,
});
}
@Get(':queueName/history')
async getJobHistory(
@Param('queueName') queueName: string,
@Query('start') start: string,
@Query('end') end: string,
@Query('status') status?: string
): Promise<JobHistoryData> {
const timeRange = {
start: new Date(start),
end: new Date(end),
};
return await this.queueMonitoringService.getJobHistory(queueName, timeRange, status);
}
@Post(':queueName/pause')
async pauseQueue(@Param('queueName') queueName: string): Promise<{ message: string }> {
await this.jobService.pauseQueue(queueName);
return { message: `Queue ${queueName} paused successfully` };
}
@Post(':queueName/resume')
async resumeQueue(@Param('queueName') queueName: string): Promise<{ message: string }> {
await this.jobService.resumeQueue(queueName);
return { message: `Queue ${queueName} resumed successfully` };
}
@Post(':queueName/jobs/:jobId/retry')
async retryJob(
@Param('queueName') queueName: string,
@Param('jobId') jobId: string
): Promise<{ message: string }> {
await this.jobService.retryJob(queueName, jobId);
return { message: `Job ${jobId} queued for retry` };
}
@Delete(':queueName/jobs/:jobId')
async cancelJob(
@Param('queueName') queueName: string,
@Param('jobId') jobId: string
): Promise<{ message: string }> {
const cancelled = await this.jobService.cancelJob(queueName, jobId);
if (cancelled) {
return { message: `Job ${jobId} cancelled successfully` };
} else {
return { message: `Job ${jobId} not found or already completed` };
}
}
@Delete(':queueName/failed')
async clearFailedJobs(@Param('queueName') queueName: string): Promise<{ message: string }> {
const count = await this.jobService.clearFailedJobs(queueName);
return { message: `Cleared ${count} failed jobs from ${queueName}` };
}
@Get(':queueName/metrics')
async getQueueMetrics(
@Param('queueName') queueName: string,
@Query('period') period: string = '24h'
): Promise<PerformanceMetrics> {
return await this.queueMonitoringService.getQueuePerformanceMetrics(queueName);
}
}
// Types and Interfaces for Monitoring
interface QueueOverview {
timestamp: Date;
queues: QueueStats[];
systemMetrics: SystemMetrics;
alerts: Alert[];
summary: QueueSummary;
}
interface DetailedQueueInfo {
queueName: string;
stats: QueueStats;
recentJobs: JobInfo[];
performance: PerformanceMetrics;
healthScore: number;
recommendations: string[];
}
interface JobHistoryData {
jobs: JobInfo[];
statistics: {
totalJobs: number;
successRate: number;
averageProcessingTime: number;
peakHours: number[];
};
}
interface SystemMetrics {
memory: NodeJS.MemoryUsage;
cpu: NodeJS.CpuUsage;
uptime: number;
redisConnections: number;
activeWorkers: number;
}
interface Alert {
type: string;
severity: 'LOW' | 'MEDIUM' | 'HIGH';
queue: string;
message: string;
timestamp: Date;
}
interface QueueSummary {
totalQueues: number;
totalWaiting: number;
totalActive: number;
totalCompleted: number;
totalFailed: number;
overallHealthScore: number;
}
interface JobInfo {
id: string;
name: string;
state: string;
data: any;
progress: number;
createdAt: Date;
processedAt?: Date;
finishedAt?: Date;
failedReason?: string;
attempts: number;
}
interface PerformanceMetrics {
averageProcessingTime: number;
throughput: number;
errorRate: number;
trends: {
processing_time: Array<{ timestamp: Date; value: number }>;
throughput: Array<{ timestamp: Date; value: number }>;
error_rate: Array<{ timestamp: Date; value: number }>;
};
}
// Real-time Monitoring with WebSockets
@Injectable()
export class RealTimeQueueMonitoring {
constructor(
private queueMonitoringService: QueueMonitoringService,
@InjectQueue('monitoring') private monitoringQueue: Queue,
) {
this.startRealTimeMonitoring();
}
private startRealTimeMonitoring(): void {
// Setup real-time monitoring that pushes updates via WebSocket
setInterval(async () => {
try {
const overview = await this.queueMonitoringService.getComprehensiveQueueStatus();
// Emit to connected WebSocket clients
this.broadcastQueueUpdate(overview);
// Check for critical alerts
const criticalAlerts = overview.alerts.filter(alert => alert.severity === 'HIGH');
if (criticalAlerts.length > 0) {
this.sendCriticalAlerts(criticalAlerts);
}
} catch (error) {
console.error('Real-time monitoring error:', error);
}
}, 5000); // Update every 5 seconds
}
private broadcastQueueUpdate(overview: QueueOverview): void {
// Implementation would use WebSocket gateway to broadcast updates
console.log('Broadcasting queue update to connected clients');
}
private sendCriticalAlerts(alerts: Alert[]): void {
// Implementation would send immediate notifications
console.log('Sending critical alerts:', alerts);
}
}Task Scheduling und Background Jobs sind fundamentale Bausteine für skalierbare NestJS-Anwendungen. Die Kombination aus zeitgesteuerten Cron Jobs und robusten Queue-basierten Background Jobs ermöglicht es, komplexe Workflows zu orchestrieren, System-Performance zu optimieren und Benutzererfahrungen durch asynchrone Verarbeitung zu verbessern. Mit umfassendem Monitoring und intelligenten Retry-Mechanismen können Sie zuverlässige und wartbare Task-Processing-Systeme aufbauen.