19 WebSockets und Real-time Features

WebSockets ermöglichen bidirektionale, echtzeitfähige Kommunikation zwischen Client und Server und sind essentiell für moderne Webanwendungen. NestJS bietet eine elegante und typisierte Implementierung für WebSocket-basierte Features, die sich nahtlos in die bestehende Architektur integriert.

19.1 WebSocket Gateway

Ein WebSocket Gateway in NestJS fungiert als Eingangstor für WebSocket-Verbindungen und definiert, wie eingehende Nachrichten verarbeitet werden.

19.1.1 Grundlagen eines WebSocket Gateways

import {
  WebSocketGateway,
  WebSocketServer,
  SubscribeMessage,
  MessageBody,
  ConnectedSocket,
  OnGatewayConnection,
  OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@WebSocketGateway({
  cors: {
    origin: ['http://localhost:3000', 'https://myapp.com'],
    credentials: true,
  },
  namespace: '/chat',
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);
    client.emit('welcome', { message: 'Willkommen im Chat!' });
  }

  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
  }

  @SubscribeMessage('message')
  handleMessage(
    @MessageBody() data: { text: string; room: string },
    @ConnectedSocket() client: Socket,
  ) {
    // Nachricht an alle Clients in einem Raum senden
    this.server.to(data.room).emit('message', {
      id: Date.now(),
      text: data.text,
      sender: client.id,
      timestamp: new Date(),
    });
  }
}

19.1.2 Gateway-Konfiguration

import { Module } from '@nestjs/common';
import { ChatGateway } from './chat.gateway';
import { ChatService } from './chat.service';

@Module({
  providers: [ChatGateway, ChatService],
})
export class ChatModule {}

Der @WebSocketGateway Dekorator akzeptiert verschiedene Konfigurationsoptionen:

@WebSocketGateway({
  port: 3001, // Spezifischer Port (optional)
  namespace: '/chat', // Namespace für Organisation
  cors: {
    origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
    credentials: true,
  },
  transports: ['websocket', 'polling'], // Verfügbare Transportmethoden
})

19.2 Socket.IO Integration

NestJS nutzt standardmäßig Socket.IO, eine robuste WebSocket-Bibliothek mit Fallback-Mechanismen und erweiterten Features.

19.2.1 Erweiterte Socket.IO Features

import { Injectable } from '@nestjs/common';
import {
  WebSocketGateway,
  WebSocketServer,
  SubscribeMessage,
  MessageBody,
  ConnectedSocket,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@Injectable()
@WebSocketGateway()
export class NotificationGateway {
  @WebSocketServer()
  server: Server;

  // Nachricht an alle verbundenen Clients
  broadcastToAll(event: string, data: any) {
    this.server.emit(event, data);
  }

  // Nachricht an spezifischen Client
  sendToClient(clientId: string, event: string, data: any) {
    this.server.to(clientId).emit(event, data);
  }

  // Nachricht an alle Clients außer einem bestimmten
  broadcastExcept(excludeClientId: string, event: string, data: any) {
    this.server.broadcast.to(excludeClientId).emit(event, data);
  }

  @SubscribeMessage('joinNotificationGroup')
  handleJoinGroup(
    @MessageBody() groupId: string,
    @ConnectedSocket() client: Socket,
  ) {
    client.join(`notifications:${groupId}`);
    client.emit('joinedGroup', { groupId, success: true });
  }

  // Business Logic für Benachrichtigungen
  async sendGroupNotification(groupId: string, notification: any) {
    this.server.to(`notifications:${groupId}`).emit('notification', notification);
  }
}

19.2.2 Client-seitiges Setup

// client.ts
import { io, Socket } from 'socket.io-client';

class SocketClient {
  private socket: Socket;

  constructor() {
    this.socket = io('http://localhost:3000/chat', {
      auth: {
        token: localStorage.getItem('authToken'),
      },
      transports: ['websocket'],
    });

    this.setupEventListeners();
  }

  private setupEventListeners() {
    this.socket.on('connect', () => {
      console.log('Verbunden mit Server');
    });

    this.socket.on('message', (data) => {
      console.log('Neue Nachricht:', data);
    });

    this.socket.on('disconnect', () => {
      console.log('Verbindung getrennt');
    });
  }

  sendMessage(text: string, room: string) {
    this.socket.emit('message', { text, room });
  }

  joinRoom(room: string) {
    this.socket.emit('joinRoom', room);
  }
}

19.3 Real-time Events

Real-time Events ermöglichen es, Daten sofort zwischen Client und Server zu synchronisieren.

19.3.1 Event-basierte Architektur

import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';

// Event-Typen definieren
export interface UserStatusEvent {
  userId: string;
  status: 'online' | 'offline' | 'away';
  timestamp: Date;
}

export interface ChatMessageEvent {
  id: string;
  roomId: string;
  userId: string;
  message: string;
  timestamp: Date;
}

@Injectable()
@WebSocketGateway()
export class RealtimeGateway {
  @WebSocketServer()
  server: Server;

  constructor(private eventEmitter: EventEmitter2) {
    // Event Listener für interne Events
    this.eventEmitter.on('user.status.changed', this.handleUserStatusChange.bind(this));
    this.eventEmitter.on('chat.message.created', this.handleNewMessage.bind(this));
  }

  private handleUserStatusChange(event: UserStatusEvent) {
    // Status-Update an alle relevanten Clients senden
    this.server.emit('userStatusUpdate', {
      userId: event.userId,
      status: event.status,
      timestamp: event.timestamp,
    });
  }

  private handleNewMessage(event: ChatMessageEvent) {
    // Nachricht an alle Clients im Raum
    this.server.to(`room:${event.roomId}`).emit('newMessage', event);
  }
}

// Service für Business Logic
@Injectable()
export class UserStatusService {
  constructor(private eventEmitter: EventEmitter2) {}

  async updateUserStatus(userId: string, status: 'online' | 'offline' | 'away') {
    // Geschäftslogik ausführen
    await this.saveStatusToDatabase(userId, status);

    // Event emittieren für Real-time Update
    this.eventEmitter.emit('user.status.changed', {
      userId,
      status,
      timestamp: new Date(),
    } as UserStatusEvent);
  }

  private async saveStatusToDatabase(userId: string, status: string) {
    // Datenbank-Update Logik
  }
}

19.3.2 Reactive Programming mit RxJS

import { Observable, fromEvent, merge } from 'rxjs';
import { map, filter, debounceTime } from 'rxjs/operators';

@WebSocketGateway()
export class ReactiveGateway {
  @WebSocketServer()
  server: Server;

  handleConnection(client: Socket) {
    // Observable für Client-Events erstellen
    const messageStream$ = fromEvent(client, 'message').pipe(
      map((data: any) => ({ type: 'message', data, clientId: client.id })),
    );

    const typingStream$ = fromEvent(client, 'typing').pipe(
      map((data: any) => ({ type: 'typing', data, clientId: client.id })),
      debounceTime(300), // Debounce für Typing-Events
    );

    // Streams kombinieren
    const clientEvents$ = merge(messageStream$, typingStream$);

    // Stream abonnieren
    clientEvents$.subscribe((event) => {
      this.handleClientEvent(event);
    });
  }

  private handleClientEvent(event: any) {
    switch (event.type) {
      case 'message':
        this.broadcastMessage(event.data, event.clientId);
        break;
      case 'typing':
        this.broadcastTyping(event.data, event.clientId);
        break;
    }
  }

  private broadcastMessage(data: any, senderId: string) {
    this.server.broadcast.to(senderId).emit('message', data);
  }

  private broadcastTyping(data: any, senderId: string) {
    this.server.broadcast.to(senderId).emit('userTyping', data);
  }
}

19.4 Room Management

Rooms ermöglichen es, Clients in logische Gruppen zu organisieren und gezielte Nachrichten zu senden.

19.4.1 Dynamisches Room Management

@Injectable()
@WebSocketGateway()
export class RoomGateway {
  @WebSocketServer()
  server: Server;

  private userRooms = new Map<string, Set<string>>(); // userId -> Set of roomIds
  private roomMembers = new Map<string, Set<string>>(); // roomId -> Set of userIds

  @SubscribeMessage('joinRoom')
  async handleJoinRoom(
    @MessageBody() data: { roomId: string; userId: string },
    @ConnectedSocket() client: Socket,
  ) {
    const { roomId, userId } = data;

    // Autorisation prüfen
    if (!(await this.canUserJoinRoom(userId, roomId))) {
      client.emit('error', { message: 'Zugang zu diesem Raum verweigert' });
      return;
    }

    // Client zu Room hinzufügen
    client.join(roomId);

    // Interne Tracking-Strukturen aktualisieren
    this.addUserToRoom(userId, roomId);

    // Andere Mitglieder benachrichtigen
    client.to(roomId).emit('userJoined', {
      userId,
      roomId,
      timestamp: new Date(),
    });

    // Client über erfolgreichen Beitritt informieren
    client.emit('joinedRoom', {
      roomId,
      members: Array.from(this.roomMembers.get(roomId) || []),
    });
  }

  @SubscribeMessage('leaveRoom')
  handleLeaveRoom(
    @MessageBody() data: { roomId: string; userId: string },
    @ConnectedSocket() client: Socket,
  ) {
    const { roomId, userId } = data;

    client.leave(roomId);
    this.removeUserFromRoom(userId, roomId);

    // Andere Mitglieder benachrichtigen
    client.to(roomId).emit('userLeft', {
      userId,
      roomId,
      timestamp: new Date(),
    });
  }

  @SubscribeMessage('getRoomInfo')
  handleGetRoomInfo(
    @MessageBody() roomId: string,
    @ConnectedSocket() client: Socket,
  ) {
    const members = this.roomMembers.get(roomId) || new Set();
    const socketsInRoom = this.server.sockets.adapter.rooms.get(roomId);

    client.emit('roomInfo', {
      roomId,
      memberCount: members.size,
      onlineCount: socketsInRoom?.size || 0,
      members: Array.from(members),
    });
  }

  handleDisconnect(client: Socket) {
    // Benutzer aus allen Räumen entfernen
    const userId = this.getUserIdFromSocket(client);
    if (userId) {
      this.removeUserFromAllRooms(userId);
    }
  }

  private addUserToRoom(userId: string, roomId: string) {
    // Benutzer-Räume aktualisieren
    if (!this.userRooms.has(userId)) {
      this.userRooms.set(userId, new Set());
    }
    this.userRooms.get(userId)!.add(roomId);

    // Raum-Mitglieder aktualisieren
    if (!this.roomMembers.has(roomId)) {
      this.roomMembers.set(roomId, new Set());
    }
    this.roomMembers.get(roomId)!.add(userId);
  }

  private removeUserFromRoom(userId: string, roomId: string) {
    this.userRooms.get(userId)?.delete(roomId);
    this.roomMembers.get(roomId)?.delete(userId);

    // Leere Sets aufräumen
    if (this.userRooms.get(userId)?.size === 0) {
      this.userRooms.delete(userId);
    }
    if (this.roomMembers.get(roomId)?.size === 0) {
      this.roomMembers.delete(roomId);
    }
  }

  private removeUserFromAllRooms(userId: string) {
    const userRooms = this.userRooms.get(userId);
    if (userRooms) {
      userRooms.forEach((roomId) => {
        this.server.to(roomId).emit('userLeft', {
          userId,
          roomId,
          timestamp: new Date(),
        });
        this.roomMembers.get(roomId)?.delete(userId);
      });
      this.userRooms.delete(userId);
    }
  }

  private async canUserJoinRoom(userId: string, roomId: string): Promise<boolean> {
    // Hier würde die Autorisierungslogik implementiert werden
    // z.B. Datenbankabfrage, Rollenprüfung, etc.
    return true;
  }

  private getUserIdFromSocket(socket: Socket): string | null {
    // Socket-Metadaten oder Token auswerten
    return socket.handshake.auth?.userId || null;
  }
}

19.4.2 Room-basierte Nachrichten

@Injectable()
export class RoomMessageService {
  constructor(private roomGateway: RoomGateway) {}

  async sendMessageToRoom(roomId: string, message: any, senderId: string) {
    // Nachricht in Datenbank speichern
    const savedMessage = await this.saveMessage(message, roomId, senderId);

    // Nachricht an alle Raum-Mitglieder senden
    this.roomGateway.server.to(roomId).emit('newMessage', savedMessage);

    return savedMessage;
  }

  async sendPrivateMessage(fromUserId: string, toUserId: string, message: any) {
    // Private Room ID generieren
    const privateRoomId = this.generatePrivateRoomId(fromUserId, toUserId);

    // Nachricht senden
    const savedMessage = await this.saveMessage(message, privateRoomId, fromUserId);

    // Beide Benutzer benachrichtigen
    this.roomGateway.server.to(privateRoomId).emit('privateMessage', savedMessage);

    return savedMessage;
  }

  private generatePrivateRoomId(userId1: string, userId2: string): string {
    // Konsistente Room-ID für private Nachrichten
    const sortedIds = [userId1, userId2].sort();
    return `private:${sortedIds[0]}:${sortedIds[1]}`;
  }

  private async saveMessage(message: any, roomId: string, senderId: string) {
    // Datenbanklogik für Nachrichtenspeicherung
    return {
      id: Date.now().toString(),
      ...message,
      roomId,
      senderId,
      timestamp: new Date(),
    };
  }
}

19.5 Authentication mit WebSockets

Sicherheit ist bei WebSocket-Verbindungen von entscheidender Bedeutung, da sie lange bestehen und sensible Daten übertragen können.

19.5.1 JWT-basierte Authentication

import { Injectable, UnauthorizedException } from '@nestjs/common';
import { JwtService } from '@nestjs/jwt';
import { Socket } from 'socket.io';

@Injectable()
export class WsAuthService {
  constructor(private jwtService: JwtService) {}

  async validateConnection(socket: Socket): Promise<any> {
    try {
      const token = this.extractTokenFromSocket(socket);
      const payload = await this.jwtService.verifyAsync(token);
      
      // User-Informationen zur Socket hinzufügen
      socket.data.user = payload;
      return payload;
    } catch (error) {
      throw new UnauthorizedException('Invalid token');
    }
  }

  private extractTokenFromSocket(socket: Socket): string {
    // Token aus verschiedenen Quellen extrahieren
    const token = 
      socket.handshake.auth?.token || 
      socket.handshake.headers?.authorization?.replace('Bearer ', '') ||
      socket.request.headers?.authorization?.replace('Bearer ', '');

    if (!token) {
      throw new UnauthorizedException('No token provided');
    }

    return token;
  }
}

// Auth Guard für WebSockets
import { CanActivate, ExecutionContext, Injectable } from '@nestjs/common';

@Injectable()
export class WsAuthGuard implements CanActivate {
  constructor(private wsAuthService: WsAuthService) {}

  async canActivate(context: ExecutionContext): Promise<boolean> {
    const socket = context.switchToWs().getClient<Socket>();
    
    try {
      await this.wsAuthService.validateConnection(socket);
      return true;
    } catch (error) {
      socket.emit('error', { message: 'Authentication failed' });
      socket.disconnect();
      return false;
    }
  }
}

19.5.2 Authenticated Gateway

import { UseGuards } from '@nestjs/common';

@UseGuards(WsAuthGuard)
@WebSocketGateway({
  cors: {
    origin: process.env.ALLOWED_ORIGINS?.split(','),
    credentials: true,
  },
})
export class AuthenticatedChatGateway {
  @WebSocketServer()
  server: Server;

  constructor(private wsAuthService: WsAuthService) {}

  async handleConnection(client: Socket) {
    try {
      const user = await this.wsAuthService.validateConnection(client);
      console.log(`Authenticated user connected: ${user.username}`);
      
      // Benutzer-spezifische Räume beitreten
      client.join(`user:${user.userId}`);
      
      // Online-Status broadcasten
      this.broadcastUserStatus(user.userId, 'online');
      
    } catch (error) {
      client.emit('authError', { message: 'Authentication failed' });
      client.disconnect();
    }
  }

  handleDisconnect(client: Socket) {
    const user = client.data.user;
    if (user) {
      this.broadcastUserStatus(user.userId, 'offline');
    }
  }

  @UseGuards(WsAuthGuard)
  @SubscribeMessage('secureMessage')
  handleSecureMessage(
    @MessageBody() data: any,
    @ConnectedSocket() client: Socket,
  ) {
    const user = client.data.user;
    
    // Autorisation für spezifische Aktionen
    if (!this.canUserPerformAction(user, data.action)) {
      client.emit('error', { message: 'Insufficient permissions' });
      return;
    }

    // Sichere Nachrichtenverarbeitung
    this.processSecureMessage(data, user);
  }

  private broadcastUserStatus(userId: string, status: string) {
    this.server.emit('userStatusChange', { userId, status });
  }

  private canUserPerformAction(user: any, action: string): boolean {
    // Rollenbasierte Autorisation
    const userRoles = user.roles || [];
    const requiredRole = this.getRequiredRoleForAction(action);
    
    return userRoles.includes(requiredRole);
  }

  private getRequiredRoleForAction(action: string): string {
    const actionRoleMap = {
      'deleteMessage': 'moderator',
      'banUser': 'admin',
      'createRoom': 'moderator',
    };
    
    return actionRoleMap[action] || 'user';
  }

  private processSecureMessage(data: any, user: any) {
    // Sichere Nachrichtenverarbeitung mit Benutzerkontext
  }
}

19.5.3 Session-basierte Authentication

import { Injectable } from '@nestjs/common';
import { Socket } from 'socket.io';

@Injectable()
export class SessionAuthService {
  async validateSocketSession(socket: Socket): Promise<any> {
    const sessionId = socket.request.headers.cookie
      ?.split(';')
      .find(c => c.trim().startsWith('sessionId='))
      ?.split('=')[1];

    if (!sessionId) {
      throw new UnauthorizedException('No session found');
    }

    // Session aus Store laden (Redis, Database, etc.)
    const session = await this.loadSession(sessionId);
    
    if (!session || !session.user) {
      throw new UnauthorizedException('Invalid session');
    }

    return session.user;
  }

  private async loadSession(sessionId: string): Promise<any> {
    // Session-Store-Implementierung
    // z.B. Redis, Database, etc.
    return null; // Placeholder
  }
}

19.6 Broadcasting und Namespaces

Broadcasting ermöglicht es, Nachrichten an mehrere Clients gleichzeitig zu senden, während Namespaces die logische Trennung verschiedener Anwendungsbereiche ermöglichen.

19.6.1 Namespace-basierte Architektur

// Chat Namespace
@WebSocketGateway({ namespace: '/chat' })
export class ChatNamespace {
  @WebSocketServer()
  server: Server;

  @SubscribeMessage('message')
  handleChatMessage(@MessageBody() data: any) {
    this.server.emit('message', data);
  }
}

// Notifications Namespace
@WebSocketGateway({ namespace: '/notifications' })
export class NotificationsNamespace {
  @WebSocketServer()
  server: Server;

  @SubscribeMessage('subscribe')
  handleSubscription(@MessageBody() data: { topics: string[] }, @ConnectedSocket() client: Socket) {
    data.topics.forEach(topic => {
      client.join(`topic:${topic}`);
    });
  }

  async sendNotificationToTopic(topic: string, notification: any) {
    this.server.to(`topic:${topic}`).emit('notification', notification);
  }
}

// Admin Namespace
@WebSocketGateway({ namespace: '/admin' })
@UseGuards(AdminAuthGuard)
export class AdminNamespace {
  @WebSocketServer()
  server: Server;

  @SubscribeMessage('systemAlert')
  handleSystemAlert(@MessageBody() alert: any) {
    // Nur an Administratoren senden
    this.server.emit('systemAlert', alert);
  }
}

19.6.2 Erweiterte Broadcasting-Strategien

@Injectable()
export class BroadcastService {
  constructor(
    @InjectRepository(User) private userRepository: Repository<User>,
    @InjectRepository(Room) private roomRepository: Repository<Room>,
  ) {}

  // Geographical Broadcasting
  async broadcastToRegion(region: string, event: string, data: any) {
    const server = this.getSocketServer();
    
    // Alle Clients in der Region finden
    const sockets = await server.fetchSockets();
    const regionalSockets = sockets.filter(socket => 
      socket.data.user?.region === region
    );

    regionalSockets.forEach(socket => {
      socket.emit(event, data);
    });
  }

  // Role-based Broadcasting
  async broadcastToRole(role: string, event: string, data: any) {
    const server = this.getSocketServer();
    const sockets = await server.fetchSockets();
    
    const roleSockets = sockets.filter(socket => 
      socket.data.user?.roles?.includes(role)
    );

    roleSockets.forEach(socket => {
      socket.emit(event, data);
    });
  }

  // Conditional Broadcasting
  async conditionalBroadcast(
    condition: (user: any) => boolean,
    event: string,
    data: any
  ) {
    const server = this.getSocketServer();
    const sockets = await server.fetchSockets();
    
    const filteredSockets = sockets.filter(socket => 
      condition(socket.data.user)
    );

    filteredSockets.forEach(socket => {
      socket.emit(event, data);
    });
  }

  // Batch Broadcasting mit Rate Limiting
  async batchBroadcast(
    recipients: string[],
    event: string,
    data: any,
    batchSize: number = 100,
    delayMs: number = 100
  ) {
    const server = this.getSocketServer();
    
    for (let i = 0; i < recipients.length; i += batchSize) {
      const batch = recipients.slice(i, i + batchSize);
      
      batch.forEach(userId => {
        server.to(`user:${userId}`).emit(event, data);
      });

      // Rate Limiting
      if (i + batchSize < recipients.length) {
        await this.delay(delayMs);
      }
    }
  }

  private getSocketServer(): Server {
    // Server-Instanz von einem der Gateways abrufen
    // Implementation abhängig von der spezifischen Architektur
    return null; // Placeholder
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

19.6.3 Cross-Namespace Communication

@Injectable()
export class CrossNamespaceService {
  private namespaceServers = new Map<string, Server>();

  registerNamespace(name: string, server: Server) {
    this.namespaceServers.set(name, server);
  }

  // Event von einem Namespace an einen anderen weiterleiten
  forwardEvent(
    fromNamespace: string,
    toNamespace: string,
    event: string,
    data: any,
    filter?: (socket: Socket) => boolean
  ) {
    const targetServer = this.namespaceServers.get(toNamespace);
    
    if (!targetServer) {
      console.warn(`Namespace ${toNamespace} not found`);
      return;
    }

    if (filter) {
      // Gefilterte Weiterleitung
      targetServer.fetchSockets().then(sockets => {
        sockets.filter(filter).forEach(socket => {
          socket.emit(event, { ...data, forwardedFrom: fromNamespace });
        });
      });
    } else {
      // Broadcast an alle
      targetServer.emit(event, { ...data, forwardedFrom: fromNamespace });
    }
  }

  // Global Event Broadcasting über alle Namespaces
  globalBroadcast(event: string, data: any) {
    this.namespaceServers.forEach((server, namespace) => {
      server.emit(event, { ...data, namespace });
    });
  }
}

// Usage in Gateway
@WebSocketGateway({ namespace: '/chat' })
export class ChatNamespace {
  @WebSocketServer()
  server: Server;

  constructor(private crossNamespaceService: CrossNamespaceService) {}

  afterInit() {
    this.crossNamespaceService.registerNamespace('chat', this.server);
  }

  @SubscribeMessage('importantMessage')
  handleImportantMessage(@MessageBody() data: any) {
    // Nachricht auch an Admin-Namespace weiterleiten
    this.crossNamespaceService.forwardEvent(
      'chat',
      'admin',
      'chatAlert',
      data,
      (socket) => socket.data.user?.role === 'admin'
    );
  }
}

19.6.4 Performance-Optimierung für Broadcasting

@Injectable()
export class OptimizedBroadcastService {
  private broadcastQueue = new Map<string, any[]>();
  private isProcessing = false;

  // Batch Broadcasting für bessere Performance
  async queueBroadcast(roomId: string, event: string, data: any) {
    if (!this.broadcastQueue.has(roomId)) {
      this.broadcastQueue.set(roomId, []);
    }

    this.broadcastQueue.get(roomId)!.push({ event, data, timestamp: Date.now() });

    if (!this.isProcessing) {
      this.processBroadcastQueue();
    }
  }

  private async processBroadcastQueue() {
    this.isProcessing = true;

    while (this.broadcastQueue.size > 0) {
      const promises: Promise<void>[] = [];

      for (const [roomId, events] of this.broadcastQueue.entries()) {
        if (events.length > 0) {
          const batchedEvents = events.splice(0, 10); // Batch von 10 Events
          
          promises.push(
            this.processBatchedEvents(roomId, batchedEvents)
          );
        }

        // Leere Queues entfernen
        if (events.length === 0) {
          this.broadcastQueue.delete(roomId);
        }
      }

      await Promise.all(promises);
      await this.delay(50); // Kleine Pause zwischen Batches
    }

    this.isProcessing = false;
  }

  private async processBatchedEvents(roomId: string, events: any[]) {
    const server = this.getSocketServer();
    
    // Events nach Typ gruppieren für Optimierung
    const groupedEvents = this.groupEventsByType(events);

    for (const [eventType, eventData] of groupedEvents.entries()) {
      server.to(roomId).emit(eventType, eventData);
    }
  }

  private groupEventsByType(events: any[]): Map<string, any[]> {
    const grouped = new Map<string, any[]>();

    events.forEach(({ event, data }) => {
      if (!grouped.has(event)) {
        grouped.set(event, []);
      }
      grouped.get(event)!.push(data);
    });

    return grouped;
  }

  // Compression für große Datenmengen
  private compressLargePayload(data: any): any {
    const stringified = JSON.stringify(data);
    
    if (stringified.length > 1024) { // 1KB Threshold
      // Implementierung einer Komprimierungslogik
      return {
        compressed: true,
        data: this.compress(stringified),
      };
    }

    return data;
  }

  private compress(data: string): string {
    // Hier würde eine echte Komprimierung implementiert werden
    // z.B. mit zlib oder einer anderen Bibliothek
    return data; // Placeholder
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  private getSocketServer(): Server {
    // Server-Instanz abrufen
    return null; // Placeholder
  }
}

19.7 Best Practices und Sicherheitshinweise

19.7.1 Sicherheitsüberlegungen

import { Injectable } from '@nestjs/common';
import { ThrottlerGuard } from '@nestjs/throttler';

// Rate Limiting für WebSocket-Events
@Injectable()
export class WsThrottlerGuard extends ThrottlerGuard {
  async canActivate(context: ExecutionContext): Promise<boolean> {
    const socket = context.switchToWs().getClient();
    const userId = socket.data.user?.id;

    if (!userId) {
      return false;
    }

    return super.canActivate(context);
  }

  protected getTracker(req: Record<string, any>): string {
    return req.data.user?.id || req.id;
  }
}

// Input Validation für WebSocket-Nachrichten
import { IsString, IsNotEmpty, Length } from 'class-validator';

export class ChatMessageDto {
  @IsString()
  @IsNotEmpty()
  @Length(1, 500)
  message: string;

  @IsString()
  @IsNotEmpty()
  roomId: string;
}

@UseGuards(WsThrottlerGuard)
@WebSocketGateway()
export class SecureChatGateway {
  @UsePipes(new ValidationPipe())
  @SubscribeMessage('message')
  handleMessage(
    @MessageBody() data: ChatMessageDto,
    @ConnectedSocket() client: Socket,
  ) {
    // Nachricht ist bereits validiert
    this.processValidatedMessage(data, client);
  }

  private processValidatedMessage(data: ChatMessageDto, client: Socket) {
    // Sichere Verarbeitung der validierten Nachricht
  }
}

19.7.2 Performance-Best Practices

// Connection Pool Management
@Injectable()
export class ConnectionManager {
  private connections = new Map<string, Socket>();
  private readonly maxConnections = 10000;

  addConnection(userId: string, socket: Socket): boolean {
    if (this.connections.size >= this.maxConnections) {
      return false; // Verbindung abgelehnt
    }

    // Bestehende Verbindung schließen falls vorhanden
    const existingConnection = this.connections.get(userId);
    if (existingConnection) {
      existingConnection.disconnect();
    }

    this.connections.set(userId, socket);
    return true;
  }

  removeConnection(userId: string) {
    this.connections.delete(userId);
  }

  getConnectionCount(): number {
    return this.connections.size;
  }

  // Cleanup inaktiver Verbindungen
  async cleanupInactiveConnections() {
    const now = Date.now();
    const inactiveThreshold = 5 * 60 * 1000; // 5 Minuten

    for (const [userId, socket] of this.connections.entries()) {
      const lastActivity = socket.data.lastActivity || socket.handshake.time;
      
      if (now - lastActivity > inactiveThreshold) {
        socket.disconnect();
        this.connections.delete(userId);
      }
    }
  }
}

// Memory-effiziente Event-Verarbeitung
@Injectable()
export class EfficientEventProcessor {
  private eventBuffer = new Map<string, any[]>();
  private bufferSize = 100;

  processEvent(userId: string, event: any) {
    // Event zu Buffer hinzufügen
    if (!this.eventBuffer.has(userId)) {
      this.eventBuffer.set(userId, []);
    }

    const userBuffer = this.eventBuffer.get(userId)!;
    userBuffer.push(event);

    // Buffer-Größe kontrollieren
    if (userBuffer.length > this.bufferSize) {
      userBuffer.shift(); // Ältestes Event entfernen
    }
  }

  getRecentEvents(userId: string, count: number = 10): any[] {
    const userBuffer = this.eventBuffer.get(userId) || [];
    return userBuffer.slice(-count);
  }

  clearUserBuffer(userId: string) {
    this.eventBuffer.delete(userId);
  }
}

WebSockets in NestJS bieten eine mächtige Grundlage für Echtzeit-Anwendungen. Die typisierte Integration, kombiniert mit den robusten Sicherheits- und Performance-Features, macht NestJS zu einer exzellenten Wahl für moderne Real-time-Anwendungen. Die modulare Architektur ermöglicht es, WebSocket-Funktionalitäten sauber zu organisieren und zu skalieren, während die umfangreichen Konfigurationsmöglichkeiten eine Anpassung an spezifische Anforderungen erlauben.