| import { EventSourceParserStream } from 'eventsource-parser/stream'; |
| import type { ParsedEvent } from 'eventsource-parser'; |
|
|
| type TextStreamUpdate = { |
| done: boolean; |
| value: string; |
| |
| citations?: any; |
| |
| error?: any; |
| usage?: ResponseUsage; |
| }; |
|
|
| type ResponseUsage = { |
| |
| prompt_tokens: number; |
| |
| completion_tokens: number; |
| |
| total_tokens: number; |
| |
| [other: string]: unknown; |
| }; |
|
|
| |
| |
| export async function createOpenAITextStream( |
| responseBody: ReadableStream<Uint8Array>, |
| splitLargeDeltas: boolean |
| ): Promise<AsyncGenerator<TextStreamUpdate>> { |
| const eventStream = responseBody |
| .pipeThrough(new TextDecoderStream()) |
| .pipeThrough(new EventSourceParserStream()) |
| .getReader(); |
| let iterator = openAIStreamToIterator(eventStream); |
| if (splitLargeDeltas) { |
| iterator = streamLargeDeltasAsRandomChunks(iterator); |
| } |
| return iterator; |
| } |
|
|
| async function* openAIStreamToIterator( |
| reader: ReadableStreamDefaultReader<ParsedEvent> |
| ): AsyncGenerator<TextStreamUpdate> { |
| while (true) { |
| const { value, done } = await reader.read(); |
| if (done) { |
| yield { done: true, value: '' }; |
| break; |
| } |
| if (!value) { |
| continue; |
| } |
| const data = value.data; |
| if (data.startsWith('[DONE]')) { |
| yield { done: true, value: '' }; |
| break; |
| } |
|
|
| try { |
| const parsedData = JSON.parse(data); |
| console.log(parsedData); |
|
|
| if (parsedData.error) { |
| yield { done: true, value: '', error: parsedData.error }; |
| break; |
| } |
|
|
| if (parsedData.citations) { |
| yield { done: false, value: '', citations: parsedData.citations }; |
| continue; |
| } |
|
|
| yield { |
| done: false, |
| value: parsedData.choices?.[0]?.delta?.content ?? '', |
| usage: parsedData.usage |
| }; |
| } catch (e) { |
| console.error('Error extracting delta from SSE event:', e); |
| } |
| } |
| } |
|
|
| |
| |
| async function* streamLargeDeltasAsRandomChunks( |
| iterator: AsyncGenerator<TextStreamUpdate> |
| ): AsyncGenerator<TextStreamUpdate> { |
| for await (const textStreamUpdate of iterator) { |
| if (textStreamUpdate.done) { |
| yield textStreamUpdate; |
| return; |
| } |
| if (textStreamUpdate.citations) { |
| yield textStreamUpdate; |
| continue; |
| } |
| let content = textStreamUpdate.value; |
| if (content.length < 5) { |
| yield { done: false, value: content }; |
| continue; |
| } |
| while (content != '') { |
| const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length); |
| const chunk = content.slice(0, chunkSize); |
| yield { done: false, value: chunk }; |
| |
| |
| if (document?.visibilityState !== 'hidden') { |
| await sleep(5); |
| } |
| content = content.slice(chunkSize); |
| } |
| } |
| } |
|
|
| const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); |
|
|