WebSockets ermöglichen bidirektionale, echtzeitfähige Kommunikation zwischen Client und Server und sind essentiell für moderne Webanwendungen. NestJS bietet eine elegante und typisierte Implementierung für WebSocket-basierte Features, die sich nahtlos in die bestehende Architektur integriert.
Ein WebSocket Gateway in NestJS fungiert als Eingangstor für WebSocket-Verbindungen und definiert, wie eingehende Nachrichten verarbeitet werden.
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
MessageBody,
ConnectedSocket,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({
cors: {
origin: ['http://localhost:3000', 'https://myapp.com'],
credentials: true,
},
namespace: '/chat',
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
handleConnection(client: Socket) {
console.log(`Client connected: ${client.id}`);
client.emit('welcome', { message: 'Willkommen im Chat!' });
}
handleDisconnect(client: Socket) {
console.log(`Client disconnected: ${client.id}`);
}
@SubscribeMessage('message')
handleMessage(
@MessageBody() data: { text: string; room: string },
@ConnectedSocket() client: Socket,
) {
// Nachricht an alle Clients in einem Raum senden
this.server.to(data.room).emit('message', {
id: Date.now(),
text: data.text,
sender: client.id,
timestamp: new Date(),
});
}
}import { Module } from '@nestjs/common';
import { ChatGateway } from './chat.gateway';
import { ChatService } from './chat.service';
@Module({
providers: [ChatGateway, ChatService],
})
export class ChatModule {}Der @WebSocketGateway Dekorator akzeptiert verschiedene
Konfigurationsoptionen:
@WebSocketGateway({
port: 3001, // Spezifischer Port (optional)
namespace: '/chat', // Namespace für Organisation
cors: {
origin: process.env.ALLOWED_ORIGINS?.split(',') || ['http://localhost:3000'],
credentials: true,
},
transports: ['websocket', 'polling'], // Verfügbare Transportmethoden
})NestJS nutzt standardmäßig Socket.IO, eine robuste WebSocket-Bibliothek mit Fallback-Mechanismen und erweiterten Features.
import { Injectable } from '@nestjs/common';
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
MessageBody,
ConnectedSocket,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@Injectable()
@WebSocketGateway()
export class NotificationGateway {
@WebSocketServer()
server: Server;
// Nachricht an alle verbundenen Clients
broadcastToAll(event: string, data: any) {
this.server.emit(event, data);
}
// Nachricht an spezifischen Client
sendToClient(clientId: string, event: string, data: any) {
this.server.to(clientId).emit(event, data);
}
// Nachricht an alle Clients außer einem bestimmten
broadcastExcept(excludeClientId: string, event: string, data: any) {
this.server.broadcast.to(excludeClientId).emit(event, data);
}
@SubscribeMessage('joinNotificationGroup')
handleJoinGroup(
@MessageBody() groupId: string,
@ConnectedSocket() client: Socket,
) {
client.join(`notifications:${groupId}`);
client.emit('joinedGroup', { groupId, success: true });
}
// Business Logic für Benachrichtigungen
async sendGroupNotification(groupId: string, notification: any) {
this.server.to(`notifications:${groupId}`).emit('notification', notification);
}
}// client.ts
import { io, Socket } from 'socket.io-client';
class SocketClient {
private socket: Socket;
constructor() {
this.socket = io('http://localhost:3000/chat', {
auth: {
token: localStorage.getItem('authToken'),
},
transports: ['websocket'],
});
this.setupEventListeners();
}
private setupEventListeners() {
this.socket.on('connect', () => {
console.log('Verbunden mit Server');
});
this.socket.on('message', (data) => {
console.log('Neue Nachricht:', data);
});
this.socket.on('disconnect', () => {
console.log('Verbindung getrennt');
});
}
sendMessage(text: string, room: string) {
this.socket.emit('message', { text, room });
}
joinRoom(room: string) {
this.socket.emit('joinRoom', room);
}
}Real-time Events ermöglichen es, Daten sofort zwischen Client und Server zu synchronisieren.
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
// Event-Typen definieren
export interface UserStatusEvent {
userId: string;
status: 'online' | 'offline' | 'away';
timestamp: Date;
}
export interface ChatMessageEvent {
id: string;
roomId: string;
userId: string;
message: string;
timestamp: Date;
}
@Injectable()
@WebSocketGateway()
export class RealtimeGateway {
@WebSocketServer()
server: Server;
constructor(private eventEmitter: EventEmitter2) {
// Event Listener für interne Events
this.eventEmitter.on('user.status.changed', this.handleUserStatusChange.bind(this));
this.eventEmitter.on('chat.message.created', this.handleNewMessage.bind(this));
}
private handleUserStatusChange(event: UserStatusEvent) {
// Status-Update an alle relevanten Clients senden
this.server.emit('userStatusUpdate', {
userId: event.userId,
status: event.status,
timestamp: event.timestamp,
});
}
private handleNewMessage(event: ChatMessageEvent) {
// Nachricht an alle Clients im Raum
this.server.to(`room:${event.roomId}`).emit('newMessage', event);
}
}
// Service für Business Logic
@Injectable()
export class UserStatusService {
constructor(private eventEmitter: EventEmitter2) {}
async updateUserStatus(userId: string, status: 'online' | 'offline' | 'away') {
// Geschäftslogik ausführen
await this.saveStatusToDatabase(userId, status);
// Event emittieren für Real-time Update
this.eventEmitter.emit('user.status.changed', {
userId,
status,
timestamp: new Date(),
} as UserStatusEvent);
}
private async saveStatusToDatabase(userId: string, status: string) {
// Datenbank-Update Logik
}
}import { Observable, fromEvent, merge } from 'rxjs';
import { map, filter, debounceTime } from 'rxjs/operators';
@WebSocketGateway()
export class ReactiveGateway {
@WebSocketServer()
server: Server;
handleConnection(client: Socket) {
// Observable für Client-Events erstellen
const messageStream$ = fromEvent(client, 'message').pipe(
map((data: any) => ({ type: 'message', data, clientId: client.id })),
);
const typingStream$ = fromEvent(client, 'typing').pipe(
map((data: any) => ({ type: 'typing', data, clientId: client.id })),
debounceTime(300), // Debounce für Typing-Events
);
// Streams kombinieren
const clientEvents$ = merge(messageStream$, typingStream$);
// Stream abonnieren
clientEvents$.subscribe((event) => {
this.handleClientEvent(event);
});
}
private handleClientEvent(event: any) {
switch (event.type) {
case 'message':
this.broadcastMessage(event.data, event.clientId);
break;
case 'typing':
this.broadcastTyping(event.data, event.clientId);
break;
}
}
private broadcastMessage(data: any, senderId: string) {
this.server.broadcast.to(senderId).emit('message', data);
}
private broadcastTyping(data: any, senderId: string) {
this.server.broadcast.to(senderId).emit('userTyping', data);
}
}Rooms ermöglichen es, Clients in logische Gruppen zu organisieren und gezielte Nachrichten zu senden.
@Injectable()
@WebSocketGateway()
export class RoomGateway {
@WebSocketServer()
server: Server;
private userRooms = new Map<string, Set<string>>(); // userId -> Set of roomIds
private roomMembers = new Map<string, Set<string>>(); // roomId -> Set of userIds
@SubscribeMessage('joinRoom')
async handleJoinRoom(
@MessageBody() data: { roomId: string; userId: string },
@ConnectedSocket() client: Socket,
) {
const { roomId, userId } = data;
// Autorisation prüfen
if (!(await this.canUserJoinRoom(userId, roomId))) {
client.emit('error', { message: 'Zugang zu diesem Raum verweigert' });
return;
}
// Client zu Room hinzufügen
client.join(roomId);
// Interne Tracking-Strukturen aktualisieren
this.addUserToRoom(userId, roomId);
// Andere Mitglieder benachrichtigen
client.to(roomId).emit('userJoined', {
userId,
roomId,
timestamp: new Date(),
});
// Client über erfolgreichen Beitritt informieren
client.emit('joinedRoom', {
roomId,
members: Array.from(this.roomMembers.get(roomId) || []),
});
}
@SubscribeMessage('leaveRoom')
handleLeaveRoom(
@MessageBody() data: { roomId: string; userId: string },
@ConnectedSocket() client: Socket,
) {
const { roomId, userId } = data;
client.leave(roomId);
this.removeUserFromRoom(userId, roomId);
// Andere Mitglieder benachrichtigen
client.to(roomId).emit('userLeft', {
userId,
roomId,
timestamp: new Date(),
});
}
@SubscribeMessage('getRoomInfo')
handleGetRoomInfo(
@MessageBody() roomId: string,
@ConnectedSocket() client: Socket,
) {
const members = this.roomMembers.get(roomId) || new Set();
const socketsInRoom = this.server.sockets.adapter.rooms.get(roomId);
client.emit('roomInfo', {
roomId,
memberCount: members.size,
onlineCount: socketsInRoom?.size || 0,
members: Array.from(members),
});
}
handleDisconnect(client: Socket) {
// Benutzer aus allen Räumen entfernen
const userId = this.getUserIdFromSocket(client);
if (userId) {
this.removeUserFromAllRooms(userId);
}
}
private addUserToRoom(userId: string, roomId: string) {
// Benutzer-Räume aktualisieren
if (!this.userRooms.has(userId)) {
this.userRooms.set(userId, new Set());
}
this.userRooms.get(userId)!.add(roomId);
// Raum-Mitglieder aktualisieren
if (!this.roomMembers.has(roomId)) {
this.roomMembers.set(roomId, new Set());
}
this.roomMembers.get(roomId)!.add(userId);
}
private removeUserFromRoom(userId: string, roomId: string) {
this.userRooms.get(userId)?.delete(roomId);
this.roomMembers.get(roomId)?.delete(userId);
// Leere Sets aufräumen
if (this.userRooms.get(userId)?.size === 0) {
this.userRooms.delete(userId);
}
if (this.roomMembers.get(roomId)?.size === 0) {
this.roomMembers.delete(roomId);
}
}
private removeUserFromAllRooms(userId: string) {
const userRooms = this.userRooms.get(userId);
if (userRooms) {
userRooms.forEach((roomId) => {
this.server.to(roomId).emit('userLeft', {
userId,
roomId,
timestamp: new Date(),
});
this.roomMembers.get(roomId)?.delete(userId);
});
this.userRooms.delete(userId);
}
}
private async canUserJoinRoom(userId: string, roomId: string): Promise<boolean> {
// Hier würde die Autorisierungslogik implementiert werden
// z.B. Datenbankabfrage, Rollenprüfung, etc.
return true;
}
private getUserIdFromSocket(socket: Socket): string | null {
// Socket-Metadaten oder Token auswerten
return socket.handshake.auth?.userId || null;
}
}@Injectable()
export class RoomMessageService {
constructor(private roomGateway: RoomGateway) {}
async sendMessageToRoom(roomId: string, message: any, senderId: string) {
// Nachricht in Datenbank speichern
const savedMessage = await this.saveMessage(message, roomId, senderId);
// Nachricht an alle Raum-Mitglieder senden
this.roomGateway.server.to(roomId).emit('newMessage', savedMessage);
return savedMessage;
}
async sendPrivateMessage(fromUserId: string, toUserId: string, message: any) {
// Private Room ID generieren
const privateRoomId = this.generatePrivateRoomId(fromUserId, toUserId);
// Nachricht senden
const savedMessage = await this.saveMessage(message, privateRoomId, fromUserId);
// Beide Benutzer benachrichtigen
this.roomGateway.server.to(privateRoomId).emit('privateMessage', savedMessage);
return savedMessage;
}
private generatePrivateRoomId(userId1: string, userId2: string): string {
// Konsistente Room-ID für private Nachrichten
const sortedIds = [userId1, userId2].sort();
return `private:${sortedIds[0]}:${sortedIds[1]}`;
}
private async saveMessage(message: any, roomId: string, senderId: string) {
// Datenbanklogik für Nachrichtenspeicherung
return {
id: Date.now().toString(),
...message,
roomId,
senderId,
timestamp: new Date(),
};
}
}Sicherheit ist bei WebSocket-Verbindungen von entscheidender Bedeutung, da sie lange bestehen und sensible Daten übertragen können.
import { Injectable, UnauthorizedException } from '@nestjs/common';
import { JwtService } from '@nestjs/jwt';
import { Socket } from 'socket.io';
@Injectable()
export class WsAuthService {
constructor(private jwtService: JwtService) {}
async validateConnection(socket: Socket): Promise<any> {
try {
const token = this.extractTokenFromSocket(socket);
const payload = await this.jwtService.verifyAsync(token);
// User-Informationen zur Socket hinzufügen
socket.data.user = payload;
return payload;
} catch (error) {
throw new UnauthorizedException('Invalid token');
}
}
private extractTokenFromSocket(socket: Socket): string {
// Token aus verschiedenen Quellen extrahieren
const token =
socket.handshake.auth?.token ||
socket.handshake.headers?.authorization?.replace('Bearer ', '') ||
socket.request.headers?.authorization?.replace('Bearer ', '');
if (!token) {
throw new UnauthorizedException('No token provided');
}
return token;
}
}
// Auth Guard für WebSockets
import { CanActivate, ExecutionContext, Injectable } from '@nestjs/common';
@Injectable()
export class WsAuthGuard implements CanActivate {
constructor(private wsAuthService: WsAuthService) {}
async canActivate(context: ExecutionContext): Promise<boolean> {
const socket = context.switchToWs().getClient<Socket>();
try {
await this.wsAuthService.validateConnection(socket);
return true;
} catch (error) {
socket.emit('error', { message: 'Authentication failed' });
socket.disconnect();
return false;
}
}
}import { UseGuards } from '@nestjs/common';
@UseGuards(WsAuthGuard)
@WebSocketGateway({
cors: {
origin: process.env.ALLOWED_ORIGINS?.split(','),
credentials: true,
},
})
export class AuthenticatedChatGateway {
@WebSocketServer()
server: Server;
constructor(private wsAuthService: WsAuthService) {}
async handleConnection(client: Socket) {
try {
const user = await this.wsAuthService.validateConnection(client);
console.log(`Authenticated user connected: ${user.username}`);
// Benutzer-spezifische Räume beitreten
client.join(`user:${user.userId}`);
// Online-Status broadcasten
this.broadcastUserStatus(user.userId, 'online');
} catch (error) {
client.emit('authError', { message: 'Authentication failed' });
client.disconnect();
}
}
handleDisconnect(client: Socket) {
const user = client.data.user;
if (user) {
this.broadcastUserStatus(user.userId, 'offline');
}
}
@UseGuards(WsAuthGuard)
@SubscribeMessage('secureMessage')
handleSecureMessage(
@MessageBody() data: any,
@ConnectedSocket() client: Socket,
) {
const user = client.data.user;
// Autorisation für spezifische Aktionen
if (!this.canUserPerformAction(user, data.action)) {
client.emit('error', { message: 'Insufficient permissions' });
return;
}
// Sichere Nachrichtenverarbeitung
this.processSecureMessage(data, user);
}
private broadcastUserStatus(userId: string, status: string) {
this.server.emit('userStatusChange', { userId, status });
}
private canUserPerformAction(user: any, action: string): boolean {
// Rollenbasierte Autorisation
const userRoles = user.roles || [];
const requiredRole = this.getRequiredRoleForAction(action);
return userRoles.includes(requiredRole);
}
private getRequiredRoleForAction(action: string): string {
const actionRoleMap = {
'deleteMessage': 'moderator',
'banUser': 'admin',
'createRoom': 'moderator',
};
return actionRoleMap[action] || 'user';
}
private processSecureMessage(data: any, user: any) {
// Sichere Nachrichtenverarbeitung mit Benutzerkontext
}
}import { Injectable } from '@nestjs/common';
import { Socket } from 'socket.io';
@Injectable()
export class SessionAuthService {
async validateSocketSession(socket: Socket): Promise<any> {
const sessionId = socket.request.headers.cookie
?.split(';')
.find(c => c.trim().startsWith('sessionId='))
?.split('=')[1];
if (!sessionId) {
throw new UnauthorizedException('No session found');
}
// Session aus Store laden (Redis, Database, etc.)
const session = await this.loadSession(sessionId);
if (!session || !session.user) {
throw new UnauthorizedException('Invalid session');
}
return session.user;
}
private async loadSession(sessionId: string): Promise<any> {
// Session-Store-Implementierung
// z.B. Redis, Database, etc.
return null; // Placeholder
}
}Broadcasting ermöglicht es, Nachrichten an mehrere Clients gleichzeitig zu senden, während Namespaces die logische Trennung verschiedener Anwendungsbereiche ermöglichen.
// Chat Namespace
@WebSocketGateway({ namespace: '/chat' })
export class ChatNamespace {
@WebSocketServer()
server: Server;
@SubscribeMessage('message')
handleChatMessage(@MessageBody() data: any) {
this.server.emit('message', data);
}
}
// Notifications Namespace
@WebSocketGateway({ namespace: '/notifications' })
export class NotificationsNamespace {
@WebSocketServer()
server: Server;
@SubscribeMessage('subscribe')
handleSubscription(@MessageBody() data: { topics: string[] }, @ConnectedSocket() client: Socket) {
data.topics.forEach(topic => {
client.join(`topic:${topic}`);
});
}
async sendNotificationToTopic(topic: string, notification: any) {
this.server.to(`topic:${topic}`).emit('notification', notification);
}
}
// Admin Namespace
@WebSocketGateway({ namespace: '/admin' })
@UseGuards(AdminAuthGuard)
export class AdminNamespace {
@WebSocketServer()
server: Server;
@SubscribeMessage('systemAlert')
handleSystemAlert(@MessageBody() alert: any) {
// Nur an Administratoren senden
this.server.emit('systemAlert', alert);
}
}@Injectable()
export class BroadcastService {
constructor(
@InjectRepository(User) private userRepository: Repository<User>,
@InjectRepository(Room) private roomRepository: Repository<Room>,
) {}
// Geographical Broadcasting
async broadcastToRegion(region: string, event: string, data: any) {
const server = this.getSocketServer();
// Alle Clients in der Region finden
const sockets = await server.fetchSockets();
const regionalSockets = sockets.filter(socket =>
socket.data.user?.region === region
);
regionalSockets.forEach(socket => {
socket.emit(event, data);
});
}
// Role-based Broadcasting
async broadcastToRole(role: string, event: string, data: any) {
const server = this.getSocketServer();
const sockets = await server.fetchSockets();
const roleSockets = sockets.filter(socket =>
socket.data.user?.roles?.includes(role)
);
roleSockets.forEach(socket => {
socket.emit(event, data);
});
}
// Conditional Broadcasting
async conditionalBroadcast(
condition: (user: any) => boolean,
event: string,
data: any
) {
const server = this.getSocketServer();
const sockets = await server.fetchSockets();
const filteredSockets = sockets.filter(socket =>
condition(socket.data.user)
);
filteredSockets.forEach(socket => {
socket.emit(event, data);
});
}
// Batch Broadcasting mit Rate Limiting
async batchBroadcast(
recipients: string[],
event: string,
data: any,
batchSize: number = 100,
delayMs: number = 100
) {
const server = this.getSocketServer();
for (let i = 0; i < recipients.length; i += batchSize) {
const batch = recipients.slice(i, i + batchSize);
batch.forEach(userId => {
server.to(`user:${userId}`).emit(event, data);
});
// Rate Limiting
if (i + batchSize < recipients.length) {
await this.delay(delayMs);
}
}
}
private getSocketServer(): Server {
// Server-Instanz von einem der Gateways abrufen
// Implementation abhängig von der spezifischen Architektur
return null; // Placeholder
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}@Injectable()
export class CrossNamespaceService {
private namespaceServers = new Map<string, Server>();
registerNamespace(name: string, server: Server) {
this.namespaceServers.set(name, server);
}
// Event von einem Namespace an einen anderen weiterleiten
forwardEvent(
fromNamespace: string,
toNamespace: string,
event: string,
data: any,
filter?: (socket: Socket) => boolean
) {
const targetServer = this.namespaceServers.get(toNamespace);
if (!targetServer) {
console.warn(`Namespace ${toNamespace} not found`);
return;
}
if (filter) {
// Gefilterte Weiterleitung
targetServer.fetchSockets().then(sockets => {
sockets.filter(filter).forEach(socket => {
socket.emit(event, { ...data, forwardedFrom: fromNamespace });
});
});
} else {
// Broadcast an alle
targetServer.emit(event, { ...data, forwardedFrom: fromNamespace });
}
}
// Global Event Broadcasting über alle Namespaces
globalBroadcast(event: string, data: any) {
this.namespaceServers.forEach((server, namespace) => {
server.emit(event, { ...data, namespace });
});
}
}
// Usage in Gateway
@WebSocketGateway({ namespace: '/chat' })
export class ChatNamespace {
@WebSocketServer()
server: Server;
constructor(private crossNamespaceService: CrossNamespaceService) {}
afterInit() {
this.crossNamespaceService.registerNamespace('chat', this.server);
}
@SubscribeMessage('importantMessage')
handleImportantMessage(@MessageBody() data: any) {
// Nachricht auch an Admin-Namespace weiterleiten
this.crossNamespaceService.forwardEvent(
'chat',
'admin',
'chatAlert',
data,
(socket) => socket.data.user?.role === 'admin'
);
}
}@Injectable()
export class OptimizedBroadcastService {
private broadcastQueue = new Map<string, any[]>();
private isProcessing = false;
// Batch Broadcasting für bessere Performance
async queueBroadcast(roomId: string, event: string, data: any) {
if (!this.broadcastQueue.has(roomId)) {
this.broadcastQueue.set(roomId, []);
}
this.broadcastQueue.get(roomId)!.push({ event, data, timestamp: Date.now() });
if (!this.isProcessing) {
this.processBroadcastQueue();
}
}
private async processBroadcastQueue() {
this.isProcessing = true;
while (this.broadcastQueue.size > 0) {
const promises: Promise<void>[] = [];
for (const [roomId, events] of this.broadcastQueue.entries()) {
if (events.length > 0) {
const batchedEvents = events.splice(0, 10); // Batch von 10 Events
promises.push(
this.processBatchedEvents(roomId, batchedEvents)
);
}
// Leere Queues entfernen
if (events.length === 0) {
this.broadcastQueue.delete(roomId);
}
}
await Promise.all(promises);
await this.delay(50); // Kleine Pause zwischen Batches
}
this.isProcessing = false;
}
private async processBatchedEvents(roomId: string, events: any[]) {
const server = this.getSocketServer();
// Events nach Typ gruppieren für Optimierung
const groupedEvents = this.groupEventsByType(events);
for (const [eventType, eventData] of groupedEvents.entries()) {
server.to(roomId).emit(eventType, eventData);
}
}
private groupEventsByType(events: any[]): Map<string, any[]> {
const grouped = new Map<string, any[]>();
events.forEach(({ event, data }) => {
if (!grouped.has(event)) {
grouped.set(event, []);
}
grouped.get(event)!.push(data);
});
return grouped;
}
// Compression für große Datenmengen
private compressLargePayload(data: any): any {
const stringified = JSON.stringify(data);
if (stringified.length > 1024) { // 1KB Threshold
// Implementierung einer Komprimierungslogik
return {
compressed: true,
data: this.compress(stringified),
};
}
return data;
}
private compress(data: string): string {
// Hier würde eine echte Komprimierung implementiert werden
// z.B. mit zlib oder einer anderen Bibliothek
return data; // Placeholder
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
private getSocketServer(): Server {
// Server-Instanz abrufen
return null; // Placeholder
}
}import { Injectable } from '@nestjs/common';
import { ThrottlerGuard } from '@nestjs/throttler';
// Rate Limiting für WebSocket-Events
@Injectable()
export class WsThrottlerGuard extends ThrottlerGuard {
async canActivate(context: ExecutionContext): Promise<boolean> {
const socket = context.switchToWs().getClient();
const userId = socket.data.user?.id;
if (!userId) {
return false;
}
return super.canActivate(context);
}
protected getTracker(req: Record<string, any>): string {
return req.data.user?.id || req.id;
}
}
// Input Validation für WebSocket-Nachrichten
import { IsString, IsNotEmpty, Length } from 'class-validator';
export class ChatMessageDto {
@IsString()
@IsNotEmpty()
@Length(1, 500)
message: string;
@IsString()
@IsNotEmpty()
roomId: string;
}
@UseGuards(WsThrottlerGuard)
@WebSocketGateway()
export class SecureChatGateway {
@UsePipes(new ValidationPipe())
@SubscribeMessage('message')
handleMessage(
@MessageBody() data: ChatMessageDto,
@ConnectedSocket() client: Socket,
) {
// Nachricht ist bereits validiert
this.processValidatedMessage(data, client);
}
private processValidatedMessage(data: ChatMessageDto, client: Socket) {
// Sichere Verarbeitung der validierten Nachricht
}
}// Connection Pool Management
@Injectable()
export class ConnectionManager {
private connections = new Map<string, Socket>();
private readonly maxConnections = 10000;
addConnection(userId: string, socket: Socket): boolean {
if (this.connections.size >= this.maxConnections) {
return false; // Verbindung abgelehnt
}
// Bestehende Verbindung schließen falls vorhanden
const existingConnection = this.connections.get(userId);
if (existingConnection) {
existingConnection.disconnect();
}
this.connections.set(userId, socket);
return true;
}
removeConnection(userId: string) {
this.connections.delete(userId);
}
getConnectionCount(): number {
return this.connections.size;
}
// Cleanup inaktiver Verbindungen
async cleanupInactiveConnections() {
const now = Date.now();
const inactiveThreshold = 5 * 60 * 1000; // 5 Minuten
for (const [userId, socket] of this.connections.entries()) {
const lastActivity = socket.data.lastActivity || socket.handshake.time;
if (now - lastActivity > inactiveThreshold) {
socket.disconnect();
this.connections.delete(userId);
}
}
}
}
// Memory-effiziente Event-Verarbeitung
@Injectable()
export class EfficientEventProcessor {
private eventBuffer = new Map<string, any[]>();
private bufferSize = 100;
processEvent(userId: string, event: any) {
// Event zu Buffer hinzufügen
if (!this.eventBuffer.has(userId)) {
this.eventBuffer.set(userId, []);
}
const userBuffer = this.eventBuffer.get(userId)!;
userBuffer.push(event);
// Buffer-Größe kontrollieren
if (userBuffer.length > this.bufferSize) {
userBuffer.shift(); // Ältestes Event entfernen
}
}
getRecentEvents(userId: string, count: number = 10): any[] {
const userBuffer = this.eventBuffer.get(userId) || [];
return userBuffer.slice(-count);
}
clearUserBuffer(userId: string) {
this.eventBuffer.delete(userId);
}
}WebSockets in NestJS bieten eine mächtige Grundlage für Echtzeit-Anwendungen. Die typisierte Integration, kombiniert mit den robusten Sicherheits- und Performance-Features, macht NestJS zu einer exzellenten Wahl für moderne Real-time-Anwendungen. Die modulare Architektur ermöglicht es, WebSocket-Funktionalitäten sauber zu organisieren und zu skalieren, während die umfangreichen Konfigurationsmöglichkeiten eine Anpassung an spezifische Anforderungen erlauben.