import { Injectable } from '@angular/core'; import { Observable } from 'rxjs'; /** * Un message d'une conversation IA (vue front). * Aligné sur le DTO ChatMessageDTO côté Java. */ export interface ChatMessage { role: 'user' | 'assistant' | 'system'; content: string; } /** * Événements émis par le flux SSE durant un chat streamé. * - token : un fragment de texte vient d'arriver (à concaténer dans la bulle). * - done : le stream s'est terminé proprement (l'observable va compléter). * - error : une erreur s'est produite côté serveur (l'observable va erreur-compléter). */ export type ChatStreamEvent = | { type: 'token'; value: string } | { type: 'done' } | { type: 'error'; message: string }; /** * Service qui encapsule l'appel SSE au backend Java (POST /api/ai/chat/stream). * * On n'utilise pas EventSource (API navigateur natif) car elle ne supporte * que GET sans body. On fait donc un fetch() avec un ReadableStream qu'on * décode ligne par ligne pour extraire les événements SSE. */ @Injectable({ providedIn: 'root' }) export class AiChatService { private readonly endpoint = 'http://localhost:8080/api/ai/chat/stream'; /** * Streame la réponse de l'IA pour un historique de messages donné. * L'Observable : * - émet `{type: 'token', value}` à chaque fragment reçu ; * - se complete quand `event: done` arrive ; * - erreur-complete (via `throwError`) quand `event: error` arrive ou qu'une erreur réseau survient. * * Annuler la subscription annule proprement le fetch (AbortController). */ streamChat( loreId: string, messages: ChatMessage[], pageId?: string | null ): Observable { return new Observable((subscriber) => { const controller = new AbortController(); // Payload : pageId inclus uniquement s'il est fourni et non vide, pour // garder le comportement "chat générique au Lore" par défaut. const body: Record = { loreId, messages }; if (pageId) body['pageId'] = pageId; fetch(this.endpoint, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream' }, body: JSON.stringify(body), signal: controller.signal }) .then(async (response) => { if (!response.ok || !response.body) { subscriber.error(new Error(`HTTP ${response.status}`)); return; } await this.consumeSseStream(response.body, subscriber); }) .catch((err) => { if (controller.signal.aborted) return; // annulation volontaire, silencieuse subscriber.error(err); }); return () => controller.abort(); }); } /** * Consomme un ReadableStream SSE ligne par ligne. * Format attendu (un événement = un bloc séparé par `\n\n`) : * event: done (optionnel, défaut = 'message') * data: {...} (une ou plusieurs lignes, concaténées avec '\n') * (séparateur d'événements) */ private async consumeSseStream( body: ReadableStream, subscriber: { next: (e: ChatStreamEvent) => void; error: (e: unknown) => void; complete: () => void } ): Promise { const reader = body.getReader(); const decoder = new TextDecoder('utf-8'); let buffer = ''; // Événement SSE en cours de construction (accumulé entre lignes vides). let currentEvent: string | null = null; let currentData = ''; const dispatchCurrentEvent = () => { const eventName = currentEvent ?? 'message'; if (eventName === 'error') { const message = this.safeParseMessage(currentData); subscriber.error(new Error(message)); } else if (eventName === 'done') { subscriber.next({ type: 'done' }); subscriber.complete(); } else { // Événement 'message' (défaut) : JSON {"token": "..."} const token = this.safeParseToken(currentData); if (token) subscriber.next({ type: 'token', value: token }); } currentEvent = null; currentData = ''; }; try { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); // On découpe par lignes ; la dernière (potentiellement incomplète) reste dans buffer. let newlineIdx: number; while ((newlineIdx = buffer.indexOf('\n')) >= 0) { const line = buffer.slice(0, newlineIdx).replace(/\r$/, ''); buffer = buffer.slice(newlineIdx + 1); if (line === '') { // Ligne vide = fin d'un événement SSE : on dispatch ce qu'on a accumulé. if (currentEvent !== null || currentData !== '') { dispatchCurrentEvent(); } continue; } if (line.startsWith('event:')) { currentEvent = line.slice(6).trim(); } else if (line.startsWith('data:')) { const chunk = line.slice(5).replace(/^ /, ''); currentData = currentData ? `${currentData}\n${chunk}` : chunk; } // Autres champs SSE (id:, retry:) ignorés pour le MVP. } } // Fin de stream côté réseau sans event: done explicite → on complete quand même. if (currentEvent !== null || currentData !== '') dispatchCurrentEvent(); subscriber.complete(); } catch (err) { subscriber.error(err); } } private safeParseToken(json: string): string | null { try { const obj = JSON.parse(json) as { token?: string }; return typeof obj.token === 'string' ? obj.token : null; } catch { return null; } } private safeParseMessage(json: string): string { try { const obj = JSON.parse(json) as { message?: string }; return obj.message ?? 'Erreur inconnue côté serveur.'; } catch { return json || 'Erreur inconnue côté serveur.'; } } }