codex-proxy / src /proxy /codex-sse.ts
icebear
refactor: decouple AccountPool, split codex-api and web.ts, fix CI (#113)
0c8b3c0 unverified
raw
history blame
2.64 kB
/**
* SSE stream parser for Codex Responses API.
* Pure functions β€” no side effects or external dependencies.
*/
import type { CodexSSEEvent } from "./codex-types.js";
export function parseSSEBlock(block: string): CodexSSEEvent | null {
let event = "";
const dataLines: string[] = [];
for (const line of block.split("\n")) {
if (line.startsWith("event:")) {
event = line.slice(6).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice(5).trimStart());
}
}
if (!event && dataLines.length === 0) return null;
const raw = dataLines.join("\n");
if (raw === "[DONE]") return null;
let data: unknown;
try {
data = JSON.parse(raw);
} catch {
data = raw;
}
return { event, data };
}
const MAX_SSE_BUFFER = 10 * 1024 * 1024; // 10MB
export async function* parseSSEStream(
response: Response,
): AsyncGenerator<CodexSSEEvent> {
if (!response.body) {
throw new Error("Response body is null β€” cannot stream");
}
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let buffer = "";
let yieldedAny = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += value;
if (buffer.length > MAX_SSE_BUFFER) {
throw new Error(`SSE buffer exceeded ${MAX_SSE_BUFFER} bytes β€” aborting stream`);
}
const parts = buffer.split("\n\n");
buffer = parts.pop()!;
for (const part of parts) {
if (!part.trim()) continue;
const evt = parseSSEBlock(part);
if (evt) {
yieldedAny = true;
yield evt;
}
}
}
// Process remaining buffer
if (buffer.trim()) {
const evt = parseSSEBlock(buffer);
if (evt) {
yieldedAny = true;
yield evt;
}
}
// Non-SSE response detection
if (!yieldedAny && buffer.trim()) {
let errorMessage = buffer.trim();
try {
const parsed = JSON.parse(errorMessage) as Record<string, unknown>;
const errObj = typeof parsed.error === "object" && parsed.error !== null
? (parsed.error as Record<string, unknown>)
: undefined;
errorMessage =
(typeof parsed.detail === "string" ? parsed.detail : null)
?? (typeof errObj?.message === "string" ? errObj.message : null)
?? errorMessage;
} catch { /* use raw text */ }
yield {
event: "error",
data: { error: { type: "error", code: "non_sse_response", message: errorMessage } },
};
}
} finally {
reader.releaseLock();
}
}