/** * SSE stream parser for Codex Responses API. * Pure functions — no side effects or external dependencies. */ import type { CodexSSEEvent } from "./codex-types.js"; export function parseSSEBlock(block: string): CodexSSEEvent | null { let event = ""; const dataLines: string[] = []; for (const line of block.split("\n")) { if (line.startsWith("event:")) { event = line.slice(6).trim(); } else if (line.startsWith("data:")) { dataLines.push(line.slice(5).trimStart()); } } if (!event && dataLines.length === 0) return null; const raw = dataLines.join("\n"); if (raw === "[DONE]") return null; let data: unknown; try { data = JSON.parse(raw); } catch { data = raw; } return { event, data }; } const MAX_SSE_BUFFER = 10 * 1024 * 1024; // 10MB export async function* parseSSEStream( response: Response, ): AsyncGenerator { if (!response.body) { throw new Error("Response body is null — cannot stream"); } const reader = response.body .pipeThrough(new TextDecoderStream()) .getReader(); let buffer = ""; let yieldedAny = false; try { while (true) { const { done, value } = await reader.read(); if (done) break; buffer += value; if (buffer.length > MAX_SSE_BUFFER) { throw new Error(`SSE buffer exceeded ${MAX_SSE_BUFFER} bytes — aborting stream`); } const parts = buffer.split("\n\n"); buffer = parts.pop()!; for (const part of parts) { if (!part.trim()) continue; const evt = parseSSEBlock(part); if (evt) { yieldedAny = true; yield evt; } } } // Process remaining buffer if (buffer.trim()) { const evt = parseSSEBlock(buffer); if (evt) { yieldedAny = true; yield evt; } } // Non-SSE response detection if (!yieldedAny && buffer.trim()) { let errorMessage = buffer.trim(); try { const parsed = JSON.parse(errorMessage) as Record; const errObj = typeof parsed.error === "object" && parsed.error !== null ? (parsed.error as Record) : undefined; errorMessage = (typeof parsed.detail === "string" ? parsed.detail : null) ?? (typeof errObj?.message === "string" ? errObj.message : null) ?? errorMessage; } catch { /* use raw text */ } yield { event: "error", data: { error: { type: "error", code: "non_sse_response", message: errorMessage } }, }; } } finally { reader.releaseLock(); } }