25 Task Scheduling und Background Jobs

Task Scheduling und Background Job Processing sind essentiell für moderne Webanwendungen, die zeitgesteuerte Aufgaben, asynchrone Verarbeitung und Load-Balancing benötigen. NestJS bietet umfassende Unterstützung für Cron Jobs, Queue-basierte Background Jobs und robuste Job Processing-Mechanismen. Dieses Kapitel zeigt, wie Sie skalierbare und zuverlässige Task-Processing-Systeme implementieren.

25.1 Cron Jobs mit @nestjs/schedule

Das @nestjs/schedule Paket ermöglicht es, zeitgesteuerte Aufgaben mit Cron-Syntax und anderen Scheduling-Optionen zu implementieren.

25.1.1 Grundlegendes Scheduling Setup

import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { TasksService } from './tasks.service';

@Module({
  imports: [
    ScheduleModule.forRoot({
      // Global Timezone Configuration
      timezone: 'Europe/Berlin',
      
      // Error Handler für failed Jobs
      errorHandler: (error: Error, task: any) => {
        console.error(`Scheduled task failed:`, {
          error: error.message,
          task: task.name,
          timestamp: new Date(),
        });
      },
      
      // Global Job Configuration
      maxConcurrentJobs: 10,
      
      // Graceful Shutdown
      gracefulShutdown: true,
    }),
  ],
  providers: [TasksService],
})
export class TasksModule {}

25.1.2 Comprehensive Cron Jobs Implementation

import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression, Interval, Timeout, SchedulerRegistry } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';

@Injectable()
export class TasksService {
  private readonly logger = new Logger(TasksService.name);
  private taskMetrics = new Map<string, TaskMetrics>();

  constructor(
    @InjectRepository(User) private userRepository: Repository<User>,
    @InjectRepository(Order) private orderRepository: Repository<Order>,
    @InjectRepository(Report) private reportRepository: Repository<Report>,
    private schedulerRegistry: SchedulerRegistry,
    private emailService: EmailService,
    private analyticsService: AnalyticsService,
  ) {}

  // Basic Cron Jobs
  @Cron('0 2 * * *', {
    name: 'daily-cleanup',
    timeZone: 'Europe/Berlin',
  })
  async handleDailyCleanup(): Promise<void> {
    const taskName = 'daily-cleanup';
    const startTime = Date.now();
    
    try {
      this.logger.log('Starting daily cleanup task');
      
      // Clean up expired sessions
      await this.cleanupExpiredSessions();
      
      // Remove old temporary files
      await this.cleanupTempFiles();
      
      // Archive old logs
      await this.archiveOldLogs();
      
      // Update task metrics
      this.updateTaskMetrics(taskName, true, Date.now() - startTime);
      
      this.logger.log('Daily cleanup task completed successfully');
    } catch (error) {
      this.updateTaskMetrics(taskName, false, Date.now() - startTime, error);
      this.logger.error('Daily cleanup task failed', error.message);
      throw error;
    }
  }

  @Cron(CronExpression.EVERY_HOUR, {
    name: 'hourly-analytics',
  })
  async handleHourlyAnalytics(): Promise<void> {
    const taskName = 'hourly-analytics';
    const startTime = Date.now();
    
    try {
      this.logger.log('Starting hourly analytics task');
      
      const now = new Date();
      const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
      
      // Aggregate user activity
      const userMetrics = await this.aggregateUserActivity(oneHourAgo, now);
      
      // Process order statistics
      const orderMetrics = await this.processOrderStatistics(oneHourAgo, now);
      
      // Generate performance reports
      await this.generatePerformanceReport(userMetrics, orderMetrics, oneHourAgo);
      
      this.updateTaskMetrics(taskName, true, Date.now() - startTime);
      
      this.logger.log('Hourly analytics task completed');
    } catch (error) {
      this.updateTaskMetrics(taskName, false, Date.now() - startTime, error);
      this.logger.error('Hourly analytics task failed', error.message);
    }
  }

  // Weekly Report Generation
  @Cron('0 9 * * 1', {
    name: 'weekly-reports',
    timeZone: 'Europe/Berlin',
  })
  async generateWeeklyReports(): Promise<void> {
    const taskName = 'weekly-reports';
    const startTime = Date.now();
    
    try {
      this.logger.log('Starting weekly report generation');
      
      const now = new Date();
      const oneWeekAgo = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000);
      
      // Generate different types of reports
      const reports = await Promise.all([
        this.generateUserActivityReport(oneWeekAgo, now),
        this.generateSalesReport(oneWeekAgo, now),
        this.generatePerformanceReport(null, null, oneWeekAgo),
        this.generateErrorReport(oneWeekAgo, now),
      ]);
      
      // Send reports to stakeholders
      await this.distributeReports(reports);
      
      this.updateTaskMetrics(taskName, true, Date.now() - startTime);
      
      this.logger.log('Weekly reports generated and distributed');
    } catch (error) {
      this.updateTaskMetrics(taskName, false, Date.now() - startTime, error);
      this.logger.error('Weekly report generation failed', error.message);
    }
  }

  // Monthly Maintenance
  @Cron('0 3 1 * *', {
    name: 'monthly-maintenance',
  })
  async performMonthlyMaintenance(): Promise<void> {
    const taskName = 'monthly-maintenance';
    const startTime = Date.now();
    
    try {
      this.logger.log('Starting monthly maintenance');
      
      // Database maintenance
      await this.performDatabaseMaintenance();
      
      // Update user statistics
      await this.updateUserStatistics();
      
      // Clean old data
      await this.archiveOldData();
      
      // Optimize indexes
      await this.optimizeDatabaseIndexes();
      
      this.updateTaskMetrics(taskName, true, Date.now() - startTime);
      
      this.logger.log('Monthly maintenance completed');
    } catch (error) {
      this.updateTaskMetrics(taskName, false, Date.now() - startTime, error);
      this.logger.error('Monthly maintenance failed', error.message);
    }
  }

  // Interval-based Tasks
  @Interval('system-health-check', 30000) // Every 30 seconds
  async performSystemHealthCheck(): Promise<void> {
    try {
      const healthMetrics = await this.collectSystemHealthMetrics();
      
      // Check for critical issues
      if (healthMetrics.criticalIssues.length > 0) {
        await this.alertSystemAdmins(healthMetrics.criticalIssues);
      }
      
      // Update monitoring dashboard
      await this.updateHealthDashboard(healthMetrics);
      
    } catch (error) {
      this.logger.error('System health check failed', error.message);
    }
  }

  @Interval('cache-warm-up', 300000) // Every 5 minutes
  async warmUpCache(): Promise<void> {
    try {
      // Preload frequently accessed data
      await this.preloadPopularContent();
      
      // Refresh cached statistics
      await this.refreshCachedStats();
      
      this.logger.debug('Cache warm-up completed');
    } catch (error) {
      this.logger.error('Cache warm-up failed', error.message);
    }
  }

  // Timeout-based One-time Tasks
  @Timeout('startup-tasks', 5000) // 5 seconds after startup
  async handleStartupTasks(): Promise<void> {
    try {
      this.logger.log('Executing startup tasks');
      
      // Initialize system components
      await this.initializeSystemComponents();
      
      // Validate configuration
      await this.validateSystemConfiguration();
      
      // Load initial data
      await this.loadInitialData();
      
      this.logger.log('Startup tasks completed');
    } catch (error) {
      this.logger.error('Startup tasks failed', error.message);
    }
  }

  // Dynamic Job Management
  async addDynamicJob(
    name: string,
    cronExpression: string,
    handler: () => Promise<void>,
    options?: {
      timeZone?: string;
      startDate?: Date;
      endDate?: Date;
    }
  ): Promise<void> {
    const job = new CronJob(cronExpression, handler, null, true, options?.timeZone);
    
    this.schedulerRegistry.addCronJob(name, job);
    job.start();
    
    this.logger.log(`Dynamic cron job '${name}' added with expression '${cronExpression}'`);
  }

  async removeDynamicJob(name: string): Promise<void> {
    this.schedulerRegistry.deleteCronJob(name);
    this.logger.log(`Dynamic cron job '${name}' removed`);
  }

  async addDynamicInterval(
    name: string,
    milliseconds: number,
    handler: () => Promise<void>
  ): Promise<void> {
    const interval = setInterval(handler, milliseconds);
    this.schedulerRegistry.addInterval(name, interval);
    
    this.logger.log(`Dynamic interval '${name}' added with ${milliseconds}ms interval`);
  }

  async removeDynamicInterval(name: string): Promise<void> {
    this.schedulerRegistry.deleteInterval(name);
    this.logger.log(`Dynamic interval '${name}' removed`);
  }

  // Task Implementation Methods
  private async cleanupExpiredSessions(): Promise<void> {
    const expiredSessions = await this.userRepository
      .createQueryBuilder('user')
      .where('user.sessionExpiry < :now', { now: new Date() })
      .getMany();

    for (const user of expiredSessions) {
      user.sessionToken = null;
      user.sessionExpiry = null;
      await this.userRepository.save(user);
    }

    this.logger.log(`Cleaned up ${expiredSessions.length} expired sessions`);
  }

  private async cleanupTempFiles(): Promise<void> {
    const fs = require('fs').promises;
    const path = require('path');
    
    const tempDir = path.join(process.cwd(), 'temp');
    const oneDayAgo = Date.now() - 24 * 60 * 60 * 1000;
    
    try {
      const files = await fs.readdir(tempDir);
      let deletedCount = 0;
      
      for (const file of files) {
        const filePath = path.join(tempDir, file);
        const stats = await fs.stat(filePath);
        
        if (stats.mtimeMs < oneDayAgo) {
          await fs.unlink(filePath);
          deletedCount++;
        }
      }
      
      this.logger.log(`Cleaned up ${deletedCount} temporary files`);
    } catch (error) {
      this.logger.warn('Temp file cleanup failed', error.message);
    }
  }

  private async archiveOldLogs(): Promise<void> {
    // Implementation for log archival
    const thirtyDaysAgo = new Date();
    thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);
    
    // Archive logs older than 30 days
    // This would typically involve moving logs to cold storage
    this.logger.log('Archived old logs');
  }

  private async aggregateUserActivity(startTime: Date, endTime: Date): Promise<UserMetrics> {
    const activeUsers = await this.userRepository
      .createQueryBuilder('user')
      .where('user.lastActivityAt BETWEEN :start AND :end', {
        start: startTime,
        end: endTime,
      })
      .getCount();

    const newUsers = await this.userRepository
      .createQueryBuilder('user')
      .where('user.createdAt BETWEEN :start AND :end', {
        start: startTime,
        end: endTime,
      })
      .getCount();

    return {
      activeUsers,
      newUsers,
      period: { start: startTime, end: endTime },
    };
  }

  private async processOrderStatistics(startTime: Date, endTime: Date): Promise<OrderMetrics> {
    const orders = await this.orderRepository
      .createQueryBuilder('order')
      .where('order.createdAt BETWEEN :start AND :end', {
        start: startTime,
        end: endTime,
      })
      .getMany();

    const totalRevenue = orders.reduce((sum, order) => sum + order.totalAmount, 0);
    const averageOrderValue = orders.length > 0 ? totalRevenue / orders.length : 0;

    return {
      totalOrders: orders.length,
      totalRevenue,
      averageOrderValue,
      period: { start: startTime, end: endTime },
    };
  }

  private async generatePerformanceReport(
    userMetrics: UserMetrics | null,
    orderMetrics: OrderMetrics | null,
    startTime: Date
  ): Promise<Report> {
    const report = {
      id: `perf_${Date.now()}`,
      type: 'performance',
      generatedAt: new Date(),
      period: { start: startTime, end: new Date() },
      data: {
        userMetrics,
        orderMetrics,
        systemHealth: await this.collectSystemHealthMetrics(),
      },
    };

    await this.reportRepository.save(report);
    return report;
  }

  private async generateUserActivityReport(startTime: Date, endTime: Date): Promise<Report> {
    const userMetrics = await this.aggregateUserActivity(startTime, endTime);
    
    return {
      id: `user_activity_${Date.now()}`,
      type: 'user_activity',
      generatedAt: new Date(),
      period: { start: startTime, end: endTime },
      data: userMetrics,
    };
  }

  private async generateSalesReport(startTime: Date, endTime: Date): Promise<Report> {
    const orderMetrics = await this.processOrderStatistics(startTime, endTime);
    
    return {
      id: `sales_${Date.now()}`,
      type: 'sales',
      generatedAt: new Date(),
      period: { start: startTime, end: endTime },
      data: orderMetrics,
    };
  }

  private async generateErrorReport(startTime: Date, endTime: Date): Promise<Report> {
    // Collect error statistics from logs or monitoring system
    const errorStats = {
      totalErrors: 150,
      criticalErrors: 5,
      warningCount: 45,
      topErrors: [
        { message: 'Database connection timeout', count: 25 },
        { message: 'External API rate limit exceeded', count: 20 },
      ],
    };

    return {
      id: `errors_${Date.now()}`,
      type: 'errors',
      generatedAt: new Date(),
      period: { start: startTime, end: endTime },
      data: errorStats,
    };
  }

  private async distributeReports(reports: Report[]): Promise<void> {
    // Send reports via email to stakeholders
    const recipients = [
      'admin@company.com',
      'analytics@company.com',
      'management@company.com',
    ];

    for (const report of reports) {
      await this.emailService.sendReport(recipients, report);
    }

    this.logger.log(`Distributed ${reports.length} reports to ${recipients.length} recipients`);
  }

  private async performDatabaseMaintenance(): Promise<void> {
    // Run database maintenance tasks
    // This would include VACUUM, ANALYZE, etc. for PostgreSQL
    this.logger.log('Database maintenance completed');
  }

  private async updateUserStatistics(): Promise<void> {
    // Update cached user statistics
    const totalUsers = await this.userRepository.count();
    const activeUsers = await this.userRepository.count({
      where: { lastActivityAt: new Date() }, // Last 30 days
    });

    // Store in cache or statistics table
    this.logger.log(`Updated user statistics: ${totalUsers} total, ${activeUsers} active`);
  }

  private async archiveOldData(): Promise<void> {
    // Archive data older than retention period
    const retentionDate = new Date();
    retentionDate.setFullYear(retentionDate.getFullYear() - 2); // 2 years

    // Archive old orders, logs, etc.
    this.logger.log('Old data archived successfully');
  }

  private async optimizeDatabaseIndexes(): Promise<void> {
    // Optimize database indexes
    // This would be database-specific optimization
    this.logger.log('Database indexes optimized');
  }

  private async collectSystemHealthMetrics(): Promise<SystemHealthMetrics> {
    const memoryUsage = process.memoryUsage();
    const cpuUsage = process.cpuUsage();

    return {
      timestamp: new Date(),
      memory: {
        heapUsed: memoryUsage.heapUsed,
        heapTotal: memoryUsage.heapTotal,
        rss: memoryUsage.rss,
      },
      cpu: {
        user: cpuUsage.user,
        system: cpuUsage.system,
      },
      uptime: process.uptime(),
      criticalIssues: [], // Would be populated based on thresholds
    };
  }

  private async alertSystemAdmins(issues: string[]): Promise<void> {
    await this.emailService.sendAlert('System Critical Issues Detected', {
      issues,
      timestamp: new Date(),
    });
  }

  private async updateHealthDashboard(metrics: SystemHealthMetrics): Promise<void> {
    // Update real-time dashboard with health metrics
    // This could push to WebSocket clients or update cache
  }

  private async preloadPopularContent(): Promise<void> {
    // Preload frequently accessed data into cache
  }

  private async refreshCachedStats(): Promise<void> {
    // Refresh cached statistics
  }

  private async initializeSystemComponents(): Promise<void> {
    // Initialize system components at startup
  }

  private async validateSystemConfiguration(): Promise<void> {
    // Validate system configuration
  }

  private async loadInitialData(): Promise<void> {
    // Load initial data into cache
  }

  private updateTaskMetrics(
    taskName: string,
    success: boolean,
    duration: number,
    error?: Error
  ): void {
    const existing = this.taskMetrics.get(taskName) || {
      taskName,
      executionCount: 0,
      successCount: 0,
      failureCount: 0,
      totalDuration: 0,
      averageDuration: 0,
      lastExecution: null,
      lastError: null,
    };

    existing.executionCount++;
    existing.totalDuration += duration;
    existing.averageDuration = existing.totalDuration / existing.executionCount;
    existing.lastExecution = new Date();

    if (success) {
      existing.successCount++;
    } else {
      existing.failureCount++;
      existing.lastError = error?.message || 'Unknown error';
    }

    this.taskMetrics.set(taskName, existing);
  }

  // Monitoring and Management Methods
  getTaskMetrics(): TaskMetrics[] {
    return Array.from(this.taskMetrics.values());
  }

  getTaskMetric(taskName: string): TaskMetrics | undefined {
    return this.taskMetrics.get(taskName);
  }

  async getCronJobs(): Promise<Array<{ name: string; running: boolean; nextDate?: Date }>> {
    const jobs = this.schedulerRegistry.getCronJobs();
    const result = [];

    jobs.forEach((job, name) => {
      result.push({
        name,
        running: job.running,
        nextDate: job.nextDate()?.toDate(),
      });
    });

    return result;
  }

  async getIntervals(): Promise<Array<{ name: string }>> {
    const intervals = this.schedulerRegistry.getIntervals();
    return Array.from(intervals.keys()).map(name => ({ name }));
  }
}

// Types and Interfaces
interface TaskMetrics {
  taskName: string;
  executionCount: number;
  successCount: number;
  failureCount: number;
  totalDuration: number;
  averageDuration: number;
  lastExecution: Date | null;
  lastError: string | null;
}

interface UserMetrics {
  activeUsers: number;
  newUsers: number;
  period: { start: Date; end: Date };
}

interface OrderMetrics {
  totalOrders: number;
  totalRevenue: number;
  averageOrderValue: number;
  period: { start: Date; end: Date };
}

interface SystemHealthMetrics {
  timestamp: Date;
  memory: {
    heapUsed: number;
    heapTotal: number;
    rss: number;
  };
  cpu: {
    user: number;
    system: number;
  };
  uptime: number;
  criticalIssues: string[];
}

interface Report {
  id: string;
  type: string;
  generatedAt: Date;
  period: { start: Date; end: Date };
  data: any;
}

25.2 Bull Queue für Background Jobs

Bull Queue bietet robuste Background Job Processing mit Redis als Backend für skalierbare und zuverlässige Aufgabenverarbeitung.

25.2.1 Bull Queue Setup und Konfiguration

import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    // Global Bull Configuration
    BullModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        redis: {
          host: configService.get('REDIS_HOST', 'localhost'),
          port: configService.get('REDIS_PORT', 6379),
          password: configService.get('REDIS_PASSWORD'),
          db: configService.get('REDIS_DB', 0),
          
          // Connection Configuration
          maxRetriesPerRequest: 3,
          retryDelayOnFailover: 100,
          enableReadyCheck: false,
          lazyConnect: true,
          
          // Connection Pool
          family: 4,
          keepAlive: 30000,
        },
        
        // Global Queue Options
        defaultJobOptions: {
          removeOnComplete: 100, // Keep 100 completed jobs
          removeOnFail: 50,      // Keep 50 failed jobs
          attempts: 3,           // Default retry attempts
          backoff: {
            type: 'exponential',
            delay: 2000,
          },
        },
        
        // Advanced Configuration
        settings: {
          stalledInterval: 30000,    // Check for stalled jobs every 30s
          maxStalledCount: 1,        // Max stalled job count
        },
      }),
      inject: [ConfigService],
    }),

    // Queue Registration
    BullModule.registerQueue(
      { name: 'email' },
      { name: 'image-processing' },
      { name: 'data-export' },
      { name: 'analytics' },
      { name: 'notification' },
      { name: 'cleanup' },
    ),
  ],
  providers: [
    EmailProcessor,
    ImageProcessor,
    DataExportProcessor,
    AnalyticsProcessor,
    NotificationProcessor,
    CleanupProcessor,
    JobService,
  ],
  exports: [BullModule],
})
export class QueueModule {}

25.2.2 Queue Producers (Job Creation)

import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue, JobOptions } from 'bull';

@Injectable()
export class JobService {
  constructor(
    @InjectQueue('email') private emailQueue: Queue,
    @InjectQueue('image-processing') private imageQueue: Queue,
    @InjectQueue('data-export') private exportQueue: Queue,
    @InjectQueue('analytics') private analyticsQueue: Queue,
    @InjectQueue('notification') private notificationQueue: Queue,
    @InjectQueue('cleanup') private cleanupQueue: Queue,
  ) {}

  // Email Jobs
  async sendWelcomeEmail(userId: string, email: string): Promise<void> {
    await this.emailQueue.add('welcome-email', {
      userId,
      email,
      timestamp: new Date(),
    }, {
      priority: 5, // High priority
      delay: 1000, // 1 second delay
      attempts: 3,
    });
  }

  async sendPasswordResetEmail(email: string, resetToken: string): Promise<void> {
    await this.emailQueue.add('password-reset', {
      email,
      resetToken,
      expiresAt: new Date(Date.now() + 60 * 60 * 1000), // 1 hour
    }, {
      priority: 10, // Very high priority
      attempts: 5,
      backoff: {
        type: 'fixed',
        delay: 5000,
      },
    });
  }

  async sendBulkEmails(recipients: string[], template: string, data: any): Promise<void> {
    // Create bulk jobs
    const jobs = recipients.map(email => ({
      name: 'bulk-email',
      data: { email, template, data },
      opts: {
        priority: 1, // Low priority for bulk operations
        attempts: 2,
      },
    }));

    await this.emailQueue.addBulk(jobs);
  }

  // Image Processing Jobs
  async processImageUpload(
    imageId: string,
    filePath: string,
    sizes: string[]
  ): Promise<void> {
    await this.imageQueue.add('resize-images', {
      imageId,
      filePath,
      sizes,
    }, {
      attempts: 3,
      timeout: 300000, // 5 minutes timeout
    });
  }

  async generateImageThumbnails(imageIds: string[]): Promise<void> {
    const jobs = imageIds.map(imageId => ({
      name: 'generate-thumbnail',
      data: { imageId },
    }));

    await this.imageQueue.addBulk(jobs);
  }

  // Data Export Jobs
  async exportUserData(
    userId: string,
    format: 'csv' | 'json' | 'xlsx',
    filters?: any
  ): Promise<string> {
    const job = await this.exportQueue.add('user-data-export', {
      userId,
      format,
      filters,
      requestedAt: new Date(),
    }, {
      attempts: 2,
      timeout: 600000, // 10 minutes timeout
    });

    return job.id.toString();
  }

  async scheduleWeeklyReport(reportType: string, recipients: string[]): Promise<void> {
    await this.analyticsQueue.add('weekly-report', {
      reportType,
      recipients,
    }, {
      repeat: { cron: '0 9 * * 1' }, // Every Monday at 9 AM
      attempts: 3,
    });
  }

  // Notification Jobs
  async scheduleNotification(
    userId: string,
    message: string,
    scheduledFor: Date
  ): Promise<void> {
    const delay = scheduledFor.getTime() - Date.now();
    
    await this.notificationQueue.add('scheduled-notification', {
      userId,
      message,
      scheduledFor,
    }, {
      delay: delay > 0 ? delay : 0,
      attempts: 3,
    });
  }

  async sendPushNotifications(
    userIds: string[],
    title: string,
    body: string,
    data?: any
  ): Promise<void> {
    // Batch notifications for efficiency
    const batchSize = 100;
    
    for (let i = 0; i < userIds.length; i += batchSize) {
      const batch = userIds.slice(i, i + batchSize);
      
      await this.notificationQueue.add('push-notification-batch', {
        userIds: batch,
        title,
        body,
        data,
      }, {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000,
        },
      });
    }
  }

  // Cleanup Jobs
  async scheduleCleanupTask(
    type: 'temp-files' | 'old-logs' | 'expired-sessions',
    options?: any
  ): Promise<void> {
    await this.cleanupQueue.add(`cleanup-${type}`, {
      type,
      options,
      scheduledAt: new Date(),
    }, {
      attempts: 2,
      timeout: 300000, // 5 minutes
    });
  }

  // Recurring Jobs
  async setupRecurringJobs(): Promise<void> {
    // Daily cleanup
    await this.cleanupQueue.add('daily-cleanup', {}, {
      repeat: { cron: '0 2 * * *' }, // 2 AM daily
      attempts: 2,
    });

    // Hourly analytics
    await this.analyticsQueue.add('hourly-stats', {}, {
      repeat: { cron: '0 * * * *' }, // Every hour
      attempts: 3,
    });

    // Weekly reports
    await this.analyticsQueue.add('weekly-reports', {}, {
      repeat: { cron: '0 9 * * 1' }, // Monday 9 AM
      attempts: 3,
    });
  }

  // Job Management
  async getJobStatus(queueName: string, jobId: string): Promise<any> {
    const queue = this.getQueue(queueName);
    const job = await queue.getJob(jobId);
    
    if (!job) {
      return null;
    }

    return {
      id: job.id,
      name: job.name,
      data: job.data,
      progress: job.progress(),
      state: await job.getState(),
      createdAt: new Date(job.timestamp),
      processedAt: job.processedOn ? new Date(job.processedOn) : null,
      finishedAt: job.finishedOn ? new Date(job.finishedOn) : null,
      failedReason: job.failedReason,
      attempts: job.attemptsMade,
      delay: job.delay,
    };
  }

  async cancelJob(queueName: string, jobId: string): Promise<boolean> {
    const queue = this.getQueue(queueName);
    const job = await queue.getJob(jobId);
    
    if (job) {
      await job.remove();
      return true;
    }
    
    return false;
  }

  async retryJob(queueName: string, jobId: string): Promise<void> {
    const queue = this.getQueue(queueName);
    const job = await queue.getJob(jobId);
    
    if (job) {
      await job.retry();
    }
  }

  async pauseQueue(queueName: string): Promise<void> {
    const queue = this.getQueue(queueName);
    await queue.pause();
  }

  async resumeQueue(queueName: string): Promise<void> {
    const queue = this.getQueue(queueName);
    await queue.resume();
  }

  private getQueue(queueName: string): Queue {
    switch (queueName) {
      case 'email': return this.emailQueue;
      case 'image-processing': return this.imageQueue;
      case 'data-export': return this.exportQueue;
      case 'analytics': return this.analyticsQueue;
      case 'notification': return this.notificationQueue;
      case 'cleanup': return this.cleanupQueue;
      default: throw new Error(`Unknown queue: ${queueName}`);
    }
  }

  // Queue Statistics
  async getQueueStats(queueName: string): Promise<QueueStats> {
    const queue = this.getQueue(queueName);
    
    const waiting = await queue.getWaiting();
    const active = await queue.getActive();
    const completed = await queue.getCompleted();
    const failed = await queue.getFailed();
    const delayed = await queue.getDelayed();
    
    return {
      queueName,
      waiting: waiting.length,
      active: active.length,
      completed: completed.length,
      failed: failed.length,
      delayed: delayed.length,
      isPaused: await queue.isPaused(),
    };
  }

  async getAllQueueStats(): Promise<QueueStats[]> {
    const queueNames = ['email', 'image-processing', 'data-export', 'analytics', 'notification', 'cleanup'];
    
    return await Promise.all(
      queueNames.map(name => this.getQueueStats(name))
    );
  }
}

interface QueueStats {
  queueName: string;
  waiting: number;
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  isPaused: boolean;
}

25.3 Job Processing

Job Processors definieren, wie verschiedene Arten von Jobs verarbeitet werden.

25.3.1 Comprehensive Job Processors

import { Process, Processor, OnQueueActive, OnQueueCompleted, OnQueueFailed } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { Job } from 'bull';

// Email Processor
@Processor('email')
export class EmailProcessor {
  private readonly logger = new Logger(EmailProcessor.name);

  constructor(
    private emailService: EmailService,
    private userService: UserService,
  ) {}

  @Process('welcome-email')
  async handleWelcomeEmail(job: Job): Promise<void> {
    const { userId, email } = job.data;
    
    this.logger.log(`Processing welcome email for user: ${userId}`);
    
    try {
      const user = await this.userService.findById(userId);
      
      if (!user) {
        throw new Error(`User not found: ${userId}`);
      }

      await this.emailService.sendWelcomeEmail(email, {
        name: user.name,
        verificationLink: `${process.env.APP_URL}/verify?token=${user.verificationToken}`,
      });

      this.logger.log(`Welcome email sent successfully to: ${email}`);
    } catch (error) {
      this.logger.error(`Failed to send welcome email: ${error.message}`);
      throw error;
    }
  }

  @Process('password-reset')
  async handlePasswordReset(job: Job): Promise<void> {
    const { email, resetToken, expiresAt } = job.data;
    
    if (new Date() > new Date(expiresAt)) {
      throw new Error('Reset token has expired');
    }

    await this.emailService.sendPasswordResetEmail(email, {
      resetLink: `${process.env.APP_URL}/reset-password?token=${resetToken}`,
      expiresAt: new Date(expiresAt),
    });

    this.logger.log(`Password reset email sent to: ${email}`);
  }

  @Process('bulk-email')
  async handleBulkEmail(job: Job): Promise<void> {
    const { email, template, data } = job.data;
    
    // Add small delay to prevent overwhelming email service
    await new Promise(resolve => setTimeout(resolve, 100));
    
    await this.emailService.sendTemplatedEmail(email, template, data);
  }

  @OnQueueActive()
  onActive(job: Job) {
    this.logger.log(`Processing email job ${job.id} of type ${job.name}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job, result: any) {
    this.logger.log(`Email job ${job.id} completed successfully`);
  }

  @OnQueueFailed()
  onFailed(job: Job, err: Error) {
    this.logger.error(`Email job ${job.id} failed: ${err.message}`);
  }
}

// Image Processing Processor
@Processor('image-processing')
export class ImageProcessor {
  private readonly logger = new Logger(ImageProcessor.name);

  constructor(
    private imageService: ImageService,
    private storageService: StorageService,
  ) {}

  @Process('resize-images')
  async handleImageResize(job: Job): Promise<void> {
    const { imageId, filePath, sizes } = job.data;
    
    this.logger.log(`Processing image resize for: ${imageId}`);
    
    try {
      // Update progress
      await job.progress(10);
      
      const image = await this.imageService.findById(imageId);
      if (!image) {
        throw new Error(`Image not found: ${imageId}`);
      }

      const resizedImages = [];
      const totalSizes = sizes.length;

      for (let i = 0; i < sizes.length; i++) {
        const size = sizes[i];
        
        // Update progress
        await job.progress(10 + (i / totalSizes) * 80);
        
        const resizedPath = await this.imageService.resizeImage(filePath, size);
        const uploadedUrl = await this.storageService.uploadFile(resizedPath, {
          folder: 'resized',
          public: true,
        });
        
        resizedImages.push({
          size,
          url: uploadedUrl,
          path: resizedPath,
        });
      }

      // Update image record
      await this.imageService.updateResizedVersions(imageId, resizedImages);
      
      // Final progress update
      await job.progress(100);
      
      this.logger.log(`Image resize completed for: ${imageId}`);
    } catch (error) {
      this.logger.error(`Image resize failed for ${imageId}: ${error.message}`);
      throw error;
    }
  }

  @Process('generate-thumbnail')
  async handleThumbnailGeneration(job: Job): Promise<void> {
    const { imageId } = job.data;
    
    try {
      const image = await this.imageService.findById(imageId);
      if (!image) {
        throw new Error(`Image not found: ${imageId}`);
      }

      const thumbnailPath = await this.imageService.generateThumbnail(image.filePath);
      const thumbnailUrl = await this.storageService.uploadFile(thumbnailPath, {
        folder: 'thumbnails',
        public: true,
      });

      await this.imageService.updateThumbnail(imageId, thumbnailUrl);
      
      this.logger.log(`Thumbnail generated for image: ${imageId}`);
    } catch (error) {
      this.logger.error(`Thumbnail generation failed: ${error.message}`);
      throw error;
    }
  }
}

// Data Export Processor
@Processor('data-export')
export class DataExportProcessor {
  private readonly logger = new Logger(DataExportProcessor.name);

  constructor(
    private userService: UserService,
    private exportService: ExportService,
    private storageService: StorageService,
    private emailService: EmailService,
  ) {}

  @Process('user-data-export')
  async handleUserDataExport(job: Job): Promise<void> {
    const { userId, format, filters } = job.data;
    
    this.logger.log(`Starting data export for user: ${userId}`);
    
    try {
      await job.progress(10);
      
      // Fetch user data
      const userData = await this.userService.getExportData(userId, filters);
      
      await job.progress(40);
      
      // Generate export file
      const exportFile = await this.exportService.generateExport(userData, format);
      
      await job.progress(70);
      
      // Upload to storage
      const fileUrl = await this.storageService.uploadFile(exportFile.path, {
        folder: 'exports',
        private: true,
        expiresIn: 24 * 60 * 60, // 24 hours
      });
      
      await job.progress(90);
      
      // Notify user
      const user = await this.userService.findById(userId);
      await this.emailService.sendDataExportReady(user.email, {
        downloadUrl: fileUrl,
        expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000),
        format,
      });
      
      await job.progress(100);
      
      this.logger.log(`Data export completed for user: ${userId}`);
    } catch (error) {
      this.logger.error(`Data export failed for user ${userId}: ${error.message}`);
      throw error;
    }
  }
}

// Analytics Processor
@Processor('analytics')
export class AnalyticsProcessor {
  private readonly logger = new Logger(AnalyticsProcessor.name);

  constructor(
    private analyticsService: AnalyticsService,
    private reportService: ReportService,
    private emailService: EmailService,
  ) {}

  @Process('hourly-stats')
  async handleHourlyStats(job: Job): Promise<void> {
    this.logger.log('Processing hourly statistics');
    
    const now = new Date();
    const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000);
    
    try {
      // Collect metrics
      const metrics = await this.analyticsService.collectHourlyMetrics(oneHourAgo, now);
      
      // Store in time series database
      await this.analyticsService.storeMetrics(metrics);
      
      // Check for anomalies
      const anomalies = await this.analyticsService.detectAnomalies(metrics);
      
      if (anomalies.length > 0) {
        await this.emailService.sendAnomalyAlert(anomalies);
      }
      
      this.logger.log('Hourly statistics processed successfully');
    } catch (error) {
      this.logger.error(`Hourly stats processing failed: ${error.message}`);
      throw error;
    }
  }

  @Process('weekly-report')
  async handleWeeklyReport(job: Job): Promise<void> {
    const { reportType, recipients } = job.data;
    
    this.logger.log(`Generating weekly report: ${reportType}`);
    
    try {
      const endDate = new Date();
      const startDate = new Date(endDate.getTime() - 7 * 24 * 60 * 60 * 1000);
      
      const report = await this.reportService.generateWeeklyReport(
        reportType,
        startDate,
        endDate
      );
      
      await this.emailService.sendWeeklyReport(recipients, report);
      
      this.logger.log(`Weekly report sent to ${recipients.length} recipients`);
    } catch (error) {
      this.logger.error(`Weekly report generation failed: ${error.message}`);
      throw error;
    }
  }
}

// Notification Processor
@Processor('notification')
export class NotificationProcessor {
  private readonly logger = new Logger(NotificationProcessor.name);

  constructor(
    private notificationService: NotificationService,
    private pushService: PushService,
  ) {}

  @Process('scheduled-notification')
  async handleScheduledNotification(job: Job): Promise<void> {
    const { userId, message, scheduledFor } = job.data;
    
    // Double-check if notification should still be sent
    if (new Date() < new Date(scheduledFor)) {
      this.logger.warn(`Notification for user ${userId} executed before scheduled time`);
    }

    await this.notificationService.sendNotification(userId, message);
    
    this.logger.log(`Scheduled notification sent to user: ${userId}`);
  }

  @Process('push-notification-batch')
  async handlePushNotificationBatch(job: Job): Promise<void> {
    const { userIds, title, body, data } = job.data;
    
    this.logger.log(`Sending push notifications to ${userIds.length} users`);
    
    try {
      const results = await this.pushService.sendBatchNotifications(userIds, {
        title,
        body,
        data,
      });
      
      const successCount = results.filter(r => r.success).length;
      const failureCount = results.length - successCount;
      
      this.logger.log(`Push notifications sent: ${successCount} success, ${failureCount} failed`);
    } catch (error) {
      this.logger.error(`Batch push notification failed: ${error.message}`);
      throw error;
    }
  }
}

// Cleanup Processor
@Processor('cleanup')
export class CleanupProcessor {
  private readonly logger = new Logger(CleanupProcessor.name);

  constructor(
    private cleanupService: CleanupService,
  ) {}

  @Process('cleanup-temp-files')
  async handleTempFileCleanup(job: Job): Promise<void> {
    this.logger.log('Starting temporary file cleanup');
    
    try {
      const deletedCount = await this.cleanupService.cleanupTempFiles();
      this.logger.log(`Cleaned up ${deletedCount} temporary files`);
    } catch (error) {
      this.logger.error(`Temp file cleanup failed: ${error.message}`);
      throw error;
    }
  }

  @Process('cleanup-old-logs')
  async handleLogCleanup(job: Job): Promise<void> {
    this.logger.log('Starting old log cleanup');
    
    try {
      const archivedCount = await this.cleanupService.archiveOldLogs();
      this.logger.log(`Archived ${archivedCount} old log files`);
    } catch (error) {
      this.logger.error(`Log cleanup failed: ${error.message}`);
      throw error;
    }
  }

  @Process('cleanup-expired-sessions')
  async handleSessionCleanup(job: Job): Promise<void> {
    this.logger.log('Starting expired session cleanup');
    
    try {
      const clearedCount = await this.cleanupService.clearExpiredSessions();
      this.logger.log(`Cleared ${clearedCount} expired sessions`);
    } catch (error) {
      this.logger.error(`Session cleanup failed: ${error.message}`);
      throw error;
    }
  }

  @Process('daily-cleanup')
  async handleDailyCleanup(job: Job): Promise<void> {
    this.logger.log('Starting daily cleanup tasks');
    
    try {
      await job.progress(20);
      await this.cleanupService.cleanupTempFiles();
      
      await job.progress(40);
      await this.cleanupService.clearExpiredSessions();
      
      await job.progress(60);
      await this.cleanupService.archiveOldLogs();
      
      await job.progress(80);
      await this.cleanupService.optimizeDatabase();
      
      await job.progress(100);
      
      this.logger.log('Daily cleanup completed successfully');
    } catch (error) {
      this.logger.error(`Daily cleanup failed: ${error.message}`);
      throw error;
    }
  }
}

25.4 Job Retry Mechanisms

Robuste Retry-Mechanismen sind essentiell für zuverlässige Background Job Processing.

25.4.1 Advanced Retry Strategies

import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue, JobOptions } from 'bull';

@Injectable()
export class RetryService {
  constructor(
    @InjectQueue('email') private emailQueue: Queue,
    @InjectQueue('critical') private criticalQueue: Queue,
  ) {}

  // Exponential Backoff Retry
  async addJobWithExponentialBackoff(
    queueName: string,
    jobName: string,
    data: any,
    maxAttempts: number = 5
  ): Promise<void> {
    const queue = this.getQueue(queueName);
    
    await queue.add(jobName, data, {
      attempts: maxAttempts,
      backoff: {
        type: 'exponential',
        delay: 2000, // Start with 2 seconds
      },
      removeOnComplete: 100,
      removeOnFail: 50,
    });
  }

  // Fixed Delay Retry
  async addJobWithFixedDelay(
    queueName: string,
    jobName: string,
    data: any,
    attempts: number = 3,
    delay: number = 5000
  ): Promise<void> {
    const queue = this.getQueue(queueName);
    
    await queue.add(jobName, data, {
      attempts,
      backoff: {
        type: 'fixed',
        delay,
      },
    });
  }

  // Custom Retry Strategy
  async addJobWithCustomRetry(
    queueName: string,
    jobName: string,
    data: any,
    retryConfig: CustomRetryConfig
  ): Promise<void> {
    const queue = this.getQueue(queueName);
    
    await queue.add(jobName, data, {
      attempts: retryConfig.maxAttempts,
      backoff: {
        type: 'custom',
      },
    });
  }

  // Conditional Retry based on Error Type
  async addJobWithConditionalRetry(
    queueName: string,
    jobName: string,
    data: any
  ): Promise<void> {
    const queue = this.getQueue(queueName);
    
    await queue.add(jobName, data, {
      attempts: 5,
      backoff: (attemptsMade: number, err: Error) => {
        // Different retry strategies based on error type
        if (err.message.includes('RATE_LIMIT')) {
          return 60000; // 1 minute for rate limit errors
        }
        
        if (err.message.includes('NETWORK_ERROR')) {
          return Math.pow(2, attemptsMade) * 1000; // Exponential for network errors
        }
        
        if (err.message.includes('AUTH_ERROR')) {
          return -1; // Don't retry auth errors
        }
        
        // Default exponential backoff
        return Math.pow(2, attemptsMade) * 2000;
      },
    });
  }

  private getQueue(queueName: string): Queue {
    switch (queueName) {
      case 'email': return this.emailQueue;
      case 'critical': return this.criticalQueue;
      default: throw new Error(`Unknown queue: ${queueName}`);
    }
  }
}

interface CustomRetryConfig {
  maxAttempts: number;
  delays: number[];
  retryableErrors: string[];
  nonRetryableErrors: string[];
}

// Enhanced Job Processor with Retry Logic
@Processor('enhanced-processor')
export class EnhancedJobProcessor {
  private readonly logger = new Logger(EnhancedJobProcessor.name);

  @Process('critical-operation')
  async handleCriticalOperation(job: Job): Promise<void> {
    const { operation, data } = job.data;
    
    try {
      await this.performOperation(operation, data);
    } catch (error) {
      // Enhanced error handling with retry decisions
      await this.handleJobError(job, error);
      throw error; // Re-throw to trigger Bull's retry mechanism
    }
  }

  private async handleJobError(job: Job, error: Error): Promise<void> {
    const attemptsMade = job.attemptsMade;
    const maxAttempts = job.opts.attempts || 3;
    
    // Log detailed error information
    this.logger.error(`Job ${job.id} failed (attempt ${attemptsMade}/${maxAttempts})`, {
      jobId: job.id,
      jobName: job.name,
      error: error.message,
      stack: error.stack,
      attemptsMade,
      maxAttempts,
      data: job.data,
    });

    // Check if this is the final attempt
    if (attemptsMade >= maxAttempts) {
      await this.handleFinalFailure(job, error);
    }

    // Determine if error is retryable
    if (this.isNonRetryableError(error)) {
      await this.moveToDeadLetterQueue(job, error);
      return;
    }

    // Log retry information
    const nextDelay = this.calculateNextDelay(attemptsMade, error);
    this.logger.warn(`Job ${job.id} will be retried in ${nextDelay}ms`);
  }

  private async handleFinalFailure(job: Job, error: Error): Promise<void> {
    // Send alert for critical job failures
    if (job.data.critical) {
      await this.alertAdministrators(job, error);
    }

    // Store failure information for analysis
    await this.logJobFailure(job, error);

    // Attempt compensation if possible
    if (job.data.compensationAction) {
      await this.executeCompensation(job.data.compensationAction);
    }
  }

  private isNonRetryableError(error: Error): boolean {
    const nonRetryablePatterns = [
      'VALIDATION_ERROR',
      'AUTH_ERROR',
      'PERMISSION_DENIED',
      'RESOURCE_NOT_FOUND',
      'INVALID_INPUT',
    ];

    return nonRetryablePatterns.some(pattern => 
      error.message.includes(pattern)
    );
  }

  private calculateNextDelay(attemptsMade: number, error: Error): number {
    // Custom delay calculation based on error type
    if (error.message.includes('RATE_LIMIT')) {
      return 60000; // 1 minute for rate limits
    }

    if (error.message.includes('SERVICE_UNAVAILABLE')) {
      return Math.pow(2, attemptsMade) * 5000; // Exponential with 5s base
    }

    // Default exponential backoff
    return Math.pow(2, attemptsMade) * 2000;
  }

  private async moveToDeadLetterQueue(job: Job, error: Error): Promise<void> {
    // Move job to dead letter queue for manual inspection
    const deadLetterData = {
      originalJob: {
        id: job.id,
        name: job.name,
        data: job.data,
        opts: job.opts,
      },
      error: {
        message: error.message,
        stack: error.stack,
      },
      failedAt: new Date(),
      attemptsMade: job.attemptsMade,
    };

    // Add to dead letter queue
    await this.deadLetterQueue.add('dead-letter', deadLetterData);
    
    this.logger.error(`Job ${job.id} moved to dead letter queue due to non-retryable error`);
  }

  private async alertAdministrators(job: Job, error: Error): Promise<void> {
    // Send critical job failure alert
    await this.notificationService.sendCriticalAlert({
      subject: 'Critical Job Failure',
      jobId: job.id,
      jobName: job.name,
      error: error.message,
      timestamp: new Date(),
    });
  }

  private async logJobFailure(job: Job, error: Error): Promise<void> {
    // Store detailed failure information for analysis
    await this.failureLogService.log({
      jobId: job.id,
      jobName: job.name,
      jobData: job.data,
      error: error.message,
      stack: error.stack,
      attemptsMade: job.attemptsMade,
      failedAt: new Date(),
    });
  }

  private async executeCompensation(compensationAction: any): Promise<void> {
    try {
      // Execute compensation logic
      await this.compensationService.execute(compensationAction);
      this.logger.log(`Compensation executed for failed job`);
    } catch (compensationError) {
      this.logger.error(`Compensation failed: ${compensationError.message}`);
    }
  }

  private async performOperation(operation: string, data: any): Promise<void> {
    // Simulate different types of operations that might fail
    switch (operation) {
      case 'api-call':
        await this.makeApiCall(data);
        break;
      case 'database-operation':
        await this.performDatabaseOperation(data);
        break;
      case 'file-processing':
        await this.processFile(data);
        break;
      default:
        throw new Error(`Unknown operation: ${operation}`);
    }
  }

  private async makeApiCall(data: any): Promise<void> {
    // Simulate API call that might fail
    if (Math.random() < 0.3) {
      throw new Error('NETWORK_ERROR: Failed to connect to external API');
    }
  }

  private async performDatabaseOperation(data: any): Promise<void> {
    // Simulate database operation that might fail
    if (Math.random() < 0.2) {
      throw new Error('DATABASE_ERROR: Connection timeout');
    }
  }

  private async processFile(data: any): Promise<void> {
    // Simulate file processing that might fail
    if (Math.random() < 0.1) {
      throw new Error('FILE_ERROR: Invalid file format');
    }
  }
}

// Circuit Breaker for Job Processing
@Injectable()
export class JobCircuitBreaker {
  private circuits = new Map<string, CircuitState>();

  async executeWithCircuitBreaker<T>(
    serviceName: string,
    operation: () => Promise<T>,
    options: CircuitBreakerOptions = {}
  ): Promise<T> {
    const circuit = this.getOrCreateCircuit(serviceName, options);
    
    if (circuit.state === 'OPEN') {
      if (Date.now() < circuit.nextRetryTime) {
        throw new Error(`Circuit breaker OPEN for ${serviceName}`);
      } else {
        circuit.state = 'HALF_OPEN';
      }
    }

    try {
      const result = await operation();
      this.recordSuccess(serviceName);
      return result;
    } catch (error) {
      this.recordFailure(serviceName, error);
      throw error;
    }
  }

  private getOrCreateCircuit(serviceName: string, options: CircuitBreakerOptions): CircuitState {
    if (!this.circuits.has(serviceName)) {
      this.circuits.set(serviceName, {
        state: 'CLOSED',
        failureCount: 0,
        successCount: 0,
        lastFailureTime: 0,
        nextRetryTime: 0,
        options: {
          failureThreshold: 5,
          recoveryTimeout: 60000,
          ...options,
        },
      });
    }
    
    return this.circuits.get(serviceName)!;
  }

  private recordSuccess(serviceName: string): void {
    const circuit = this.circuits.get(serviceName)!;
    circuit.successCount++;
    circuit.failureCount = 0;
    
    if (circuit.state === 'HALF_OPEN') {
      circuit.state = 'CLOSED';
    }
  }

  private recordFailure(serviceName: string, error: Error): void {
    const circuit = this.circuits.get(serviceName)!;
    circuit.failureCount++;
    circuit.lastFailureTime = Date.now();
    
    if (circuit.failureCount >= circuit.options.failureThreshold) {
      circuit.state = 'OPEN';
      circuit.nextRetryTime = Date.now() + circuit.options.recoveryTimeout;
    }
  }
}

interface CircuitState {
  state: 'CLOSED' | 'OPEN' | 'HALF_OPEN';
  failureCount: number;
  successCount: number;
  lastFailureTime: number;
  nextRetryTime: number;
  options: CircuitBreakerOptions;
}

interface CircuitBreakerOptions {
  failureThreshold?: number;
  recoveryTimeout?: number;
}

25.5 Queue Monitoring

Umfassendes Monitoring ist entscheidend für die Verwaltung und Optimierung von Queue-Systemen.

25.5.1 Monitoring Dashboard und APIs

import { Controller, Get, Post, Delete, Param, Query, Body } from '@nestjs/common';
import { Injectable } from '@nestjs/common';

// Queue Monitoring Service
@Injectable()
export class QueueMonitoringService {
  constructor(
    private jobService: JobService,
    private metricsService: MetricsService,
  ) {}

  async getComprehensiveQueueStatus(): Promise<QueueOverview> {
    const allStats = await this.jobService.getAllQueueStats();
    const systemMetrics = await this.getSystemMetrics();
    const alerts = await this.checkForAlerts();
    
    return {
      timestamp: new Date(),
      queues: allStats,
      systemMetrics,
      alerts,
      summary: this.calculateSummary(allStats),
    };
  }

  async getDetailedQueueInfo(queueName: string): Promise<DetailedQueueInfo> {
    const stats = await this.jobService.getQueueStats(queueName);
    const recentJobs = await this.getRecentJobs(queueName, 50);
    const performance = await this.getQueuePerformanceMetrics(queueName);
    const healthScore = await this.calculateQueueHealthScore(queueName);
    
    return {
      queueName,
      stats,
      recentJobs,
      performance,
      healthScore,
      recommendations: await this.getQueueRecommendations(queueName, stats, performance),
    };
  }

  async getJobHistory(
    queueName: string,
    timeRange: { start: Date; end: Date },
    status?: string
  ): Promise<JobHistoryData> {
    const jobs = await this.jobService.getJobHistory(queueName, timeRange, status);
    
    return {
      jobs,
      statistics: {
        totalJobs: jobs.length,
        successRate: this.calculateSuccessRate(jobs),
        averageProcessingTime: this.calculateAverageProcessingTime(jobs),
        peakHours: this.identifyPeakHours(jobs),
      },
    };
  }

  private async getSystemMetrics(): Promise<SystemMetrics> {
    return {
      memory: process.memoryUsage(),
      cpu: process.cpuUsage(),
      uptime: process.uptime(),
      redisConnections: await this.getRedisConnectionCount(),
      activeWorkers: await this.getActiveWorkerCount(),
    };
  }

  private async checkForAlerts(): Promise<Alert[]> {
    const alerts: Alert[] = [];
    const allStats = await this.jobService.getAllQueueStats();
    
    for (const queue of allStats) {
      // High failure rate alert
      const failureRate = queue.failed / (queue.completed + queue.failed || 1);
      if (failureRate > 0.1) { // > 10% failure rate
        alerts.push({
          type: 'HIGH_FAILURE_RATE',
          severity: 'HIGH',
          queue: queue.queueName,
          message: `High failure rate detected: ${(failureRate * 100).toFixed(1)}%`,
          timestamp: new Date(),
        });
      }
      
      // Queue backlog alert
      if (queue.waiting > 1000) {
        alerts.push({
          type: 'HIGH_BACKLOG',
          severity: 'MEDIUM',
          queue: queue.queueName,
          message: `Large queue backlog: ${queue.waiting} jobs waiting`,
          timestamp: new Date(),
        });
      }
      
      // Stalled jobs alert
      if (queue.active > 50) {
        alerts.push({
          type: 'MANY_ACTIVE_JOBS',
          severity: 'LOW',
          queue: queue.queueName,
          message: `Many active jobs: ${queue.active} currently processing`,
          timestamp: new Date(),
        });
      }
    }
    
    return alerts;
  }

  private calculateSummary(allStats: QueueStats[]): QueueSummary {
    return {
      totalQueues: allStats.length,
      totalWaiting: allStats.reduce((sum, q) => sum + q.waiting, 0),
      totalActive: allStats.reduce((sum, q) => sum + q.active, 0),
      totalCompleted: allStats.reduce((sum, q) => sum + q.completed, 0),
      totalFailed: allStats.reduce((sum, q) => sum + q.failed, 0),
      overallHealthScore: this.calculateOverallHealthScore(allStats),
    };
  }

  private async getRecentJobs(queueName: string, limit: number): Promise<JobInfo[]> {
    // Implementation would fetch recent jobs from the queue
    return [];
  }

  private async getQueuePerformanceMetrics(queueName: string): Promise<PerformanceMetrics> {
    const last24Hours = new Date(Date.now() - 24 * 60 * 60 * 1000);
    
    return {
      averageProcessingTime: await this.metricsService.getAverageProcessingTime(queueName, last24Hours),
      throughput: await this.metricsService.getThroughput(queueName, last24Hours),
      errorRate: await this.metricsService.getErrorRate(queueName, last24Hours),
      trends: await this.metricsService.getTrends(queueName, last24Hours),
    };
  }

  private async calculateQueueHealthScore(queueName: string): Promise<number> {
    const performance = await this.getQueuePerformanceMetrics(queueName);
    const stats = await this.jobService.getQueueStats(queueName);
    
    let score = 100;
    
    // Reduce score based on error rate
    score -= performance.errorRate * 50;
    
    // Reduce score based on queue backlog
    const backlogRatio = stats.waiting / (stats.waiting + stats.active + 1);
    score -= backlogRatio * 30;
    
    // Reduce score based on processing time
    if (performance.averageProcessingTime > 30000) { // > 30 seconds
      score -= 20;
    }
    
    return Math.max(0, Math.min(100, score));
  }

  private async getQueueRecommendations(
    queueName: string,
    stats: QueueStats,
    performance: PerformanceMetrics
  ): Promise<string[]> {
    const recommendations: string[] = [];
    
    if (stats.waiting > 100) {
      recommendations.push('Consider adding more workers to reduce queue backlog');
    }
    
    if (performance.errorRate > 0.05) {
      recommendations.push('Investigate and fix common failure causes');
    }
    
    if (performance.averageProcessingTime > 60000) {
      recommendations.push('Optimize job processing logic to reduce execution time');
    }
    
    if (stats.failed > 50) {
      recommendations.push('Review failed jobs and implement better error handling');
    }
    
    return recommendations;
  }

  private calculateSuccessRate(jobs: JobInfo[]): number {
    if (jobs.length === 0) return 0;
    const successful = jobs.filter(job => job.state === 'completed').length;
    return (successful / jobs.length) * 100;
  }

  private calculateAverageProcessingTime(jobs: JobInfo[]): number {
    const completedJobs = jobs.filter(job => job.state === 'completed' && job.processedAt && job.createdAt);
    if (completedJobs.length === 0) return 0;
    
    const totalTime = completedJobs.reduce((sum, job) => {
      return sum + (job.processedAt!.getTime() - job.createdAt.getTime());
    }, 0);
    
    return totalTime / completedJobs.length;
  }

  private identifyPeakHours(jobs: JobInfo[]): number[] {
    const hourCounts = new Array(24).fill(0);
    
    jobs.forEach(job => {
      const hour = job.createdAt.getHours();
      hourCounts[hour]++;
    });
    
    return hourCounts;
  }

  private calculateOverallHealthScore(allStats: QueueStats[]): number {
    if (allStats.length === 0) return 100;
    
    const totalJobs = allStats.reduce((sum, q) => sum + q.completed + q.failed, 0);
    if (totalJobs === 0) return 100;
    
    const totalFailed = allStats.reduce((sum, q) => sum + q.failed, 0);
    const errorRate = totalFailed / totalJobs;
    
    return Math.max(0, 100 - (errorRate * 100));
  }

  private async getRedisConnectionCount(): Promise<number> {
    // Implementation would check Redis connection count
    return 5;
  }

  private async getActiveWorkerCount(): Promise<number> {
    // Implementation would check active worker count
    return 10;
  }
}

// Monitoring Controller
@Controller('admin/queues')
export class QueueMonitoringController {
  constructor(
    private queueMonitoringService: QueueMonitoringService,
    private jobService: JobService,
  ) {}

  @Get('overview')
  async getQueueOverview(): Promise<QueueOverview> {
    return await this.queueMonitoringService.getComprehensiveQueueStatus();
  }

  @Get(':queueName')
  async getQueueDetails(@Param('queueName') queueName: string): Promise<DetailedQueueInfo> {
    return await this.queueMonitoringService.getDetailedQueueInfo(queueName);
  }

  @Get(':queueName/jobs')
  async getQueueJobs(
    @Param('queueName') queueName: string,
    @Query('status') status?: string,
    @Query('limit') limit: number = 50,
    @Query('offset') offset: number = 0
  ): Promise<JobInfo[]> {
    return await this.jobService.getQueueJobs(queueName, {
      status,
      limit,
      offset,
    });
  }

  @Get(':queueName/history')
  async getJobHistory(
    @Param('queueName') queueName: string,
    @Query('start') start: string,
    @Query('end') end: string,
    @Query('status') status?: string
  ): Promise<JobHistoryData> {
    const timeRange = {
      start: new Date(start),
      end: new Date(end),
    };
    
    return await this.queueMonitoringService.getJobHistory(queueName, timeRange, status);
  }

  @Post(':queueName/pause')
  async pauseQueue(@Param('queueName') queueName: string): Promise<{ message: string }> {
    await this.jobService.pauseQueue(queueName);
    return { message: `Queue ${queueName} paused successfully` };
  }

  @Post(':queueName/resume')
  async resumeQueue(@Param('queueName') queueName: string): Promise<{ message: string }> {
    await this.jobService.resumeQueue(queueName);
    return { message: `Queue ${queueName} resumed successfully` };
  }

  @Post(':queueName/jobs/:jobId/retry')
  async retryJob(
    @Param('queueName') queueName: string,
    @Param('jobId') jobId: string
  ): Promise<{ message: string }> {
    await this.jobService.retryJob(queueName, jobId);
    return { message: `Job ${jobId} queued for retry` };
  }

  @Delete(':queueName/jobs/:jobId')
  async cancelJob(
    @Param('queueName') queueName: string,
    @Param('jobId') jobId: string
  ): Promise<{ message: string }> {
    const cancelled = await this.jobService.cancelJob(queueName, jobId);
    
    if (cancelled) {
      return { message: `Job ${jobId} cancelled successfully` };
    } else {
      return { message: `Job ${jobId} not found or already completed` };
    }
  }

  @Delete(':queueName/failed')
  async clearFailedJobs(@Param('queueName') queueName: string): Promise<{ message: string }> {
    const count = await this.jobService.clearFailedJobs(queueName);
    return { message: `Cleared ${count} failed jobs from ${queueName}` };
  }

  @Get(':queueName/metrics')
  async getQueueMetrics(
    @Param('queueName') queueName: string,
    @Query('period') period: string = '24h'
  ): Promise<PerformanceMetrics> {
    return await this.queueMonitoringService.getQueuePerformanceMetrics(queueName);
  }
}

// Types and Interfaces for Monitoring
interface QueueOverview {
  timestamp: Date;
  queues: QueueStats[];
  systemMetrics: SystemMetrics;
  alerts: Alert[];
  summary: QueueSummary;
}

interface DetailedQueueInfo {
  queueName: string;
  stats: QueueStats;
  recentJobs: JobInfo[];
  performance: PerformanceMetrics;
  healthScore: number;
  recommendations: string[];
}

interface JobHistoryData {
  jobs: JobInfo[];
  statistics: {
    totalJobs: number;
    successRate: number;
    averageProcessingTime: number;
    peakHours: number[];
  };
}

interface SystemMetrics {
  memory: NodeJS.MemoryUsage;
  cpu: NodeJS.CpuUsage;
  uptime: number;
  redisConnections: number;
  activeWorkers: number;
}

interface Alert {
  type: string;
  severity: 'LOW' | 'MEDIUM' | 'HIGH';
  queue: string;
  message: string;
  timestamp: Date;
}

interface QueueSummary {
  totalQueues: number;
  totalWaiting: number;
  totalActive: number;
  totalCompleted: number;
  totalFailed: number;
  overallHealthScore: number;
}

interface JobInfo {
  id: string;
  name: string;
  state: string;
  data: any;
  progress: number;
  createdAt: Date;
  processedAt?: Date;
  finishedAt?: Date;
  failedReason?: string;
  attempts: number;
}

interface PerformanceMetrics {
  averageProcessingTime: number;
  throughput: number;
  errorRate: number;
  trends: {
    processing_time: Array<{ timestamp: Date; value: number }>;
    throughput: Array<{ timestamp: Date; value: number }>;
    error_rate: Array<{ timestamp: Date; value: number }>;
  };
}

// Real-time Monitoring with WebSockets
@Injectable()
export class RealTimeQueueMonitoring {
  constructor(
    private queueMonitoringService: QueueMonitoringService,
    @InjectQueue('monitoring') private monitoringQueue: Queue,
  ) {
    this.startRealTimeMonitoring();
  }

  private startRealTimeMonitoring(): void {
    // Setup real-time monitoring that pushes updates via WebSocket
    setInterval(async () => {
      try {
        const overview = await this.queueMonitoringService.getComprehensiveQueueStatus();
        
        // Emit to connected WebSocket clients
        this.broadcastQueueUpdate(overview);
        
        // Check for critical alerts
        const criticalAlerts = overview.alerts.filter(alert => alert.severity === 'HIGH');
        if (criticalAlerts.length > 0) {
          this.sendCriticalAlerts(criticalAlerts);
        }
      } catch (error) {
        console.error('Real-time monitoring error:', error);
      }
    }, 5000); // Update every 5 seconds
  }

  private broadcastQueueUpdate(overview: QueueOverview): void {
    // Implementation would use WebSocket gateway to broadcast updates
    console.log('Broadcasting queue update to connected clients');
  }

  private sendCriticalAlerts(alerts: Alert[]): void {
    // Implementation would send immediate notifications
    console.log('Sending critical alerts:', alerts);
  }
}

Task Scheduling und Background Jobs sind fundamentale Bausteine für skalierbare NestJS-Anwendungen. Die Kombination aus zeitgesteuerten Cron Jobs und robusten Queue-basierten Background Jobs ermöglicht es, komplexe Workflows zu orchestrieren, System-Performance zu optimieren und Benutzererfahrungen durch asynchrone Verarbeitung zu verbessern. Mit umfassendem Monitoring und intelligenten Retry-Mechanismen können Sie zuverlässige und wartbare Task-Processing-Systeme aufbauen.