| import type { MessageFile } from "$lib/types/Message"; |
| import { |
| type MessageUpdate, |
| type MessageStreamUpdate, |
| type MessageToolCallUpdate, |
| MessageToolUpdateType, |
| MessageUpdateType, |
| type MessageToolUpdate, |
| type MessageWebSearchUpdate, |
| type MessageWebSearchGeneralUpdate, |
| type MessageWebSearchSourcesUpdate, |
| type MessageWebSearchErrorUpdate, |
| MessageWebSearchUpdateType, |
| type MessageToolErrorUpdate, |
| type MessageToolResultUpdate, |
| } from "$lib/types/MessageUpdate"; |
| import { env as envPublic } from "$env/dynamic/public"; |
|
|
| export const isMessageWebSearchUpdate = (update: MessageUpdate): update is MessageWebSearchUpdate => |
| update.type === MessageUpdateType.WebSearch; |
| export const isMessageWebSearchGeneralUpdate = ( |
| update: MessageUpdate |
| ): update is MessageWebSearchGeneralUpdate => |
| isMessageWebSearchUpdate(update) && update.subtype === MessageWebSearchUpdateType.Update; |
| export const isMessageWebSearchSourcesUpdate = ( |
| update: MessageUpdate |
| ): update is MessageWebSearchSourcesUpdate => |
| isMessageWebSearchUpdate(update) && update.subtype === MessageWebSearchUpdateType.Sources; |
| export const isMessageWebSearchErrorUpdate = ( |
| update: MessageUpdate |
| ): update is MessageWebSearchErrorUpdate => |
| isMessageWebSearchUpdate(update) && update.subtype === MessageWebSearchUpdateType.Error; |
|
|
| export const isMessageToolUpdate = (update: MessageUpdate): update is MessageToolUpdate => |
| update.type === MessageUpdateType.Tool; |
| export const isMessageToolCallUpdate = (update: MessageUpdate): update is MessageToolCallUpdate => |
| isMessageToolUpdate(update) && update.subtype === MessageToolUpdateType.Call; |
| export const isMessageToolResultUpdate = ( |
| update: MessageUpdate |
| ): update is MessageToolResultUpdate => |
| isMessageToolUpdate(update) && update.subtype === MessageToolUpdateType.Result; |
| export const isMessageToolErrorUpdate = (update: MessageUpdate): update is MessageToolErrorUpdate => |
| isMessageToolUpdate(update) && update.subtype === MessageToolUpdateType.Error; |
|
|
| type MessageUpdateRequestOptions = { |
| base: string; |
| inputs?: string; |
| messageId?: string; |
| isRetry: boolean; |
| isContinue: boolean; |
| webSearch: boolean; |
| tools?: Array<string>; |
| files?: MessageFile[]; |
| }; |
| export async function fetchMessageUpdates( |
| conversationId: string, |
| opts: MessageUpdateRequestOptions, |
| abortSignal: AbortSignal |
| ): Promise<AsyncGenerator<MessageUpdate>> { |
| const abortController = new AbortController(); |
| abortSignal.addEventListener("abort", () => abortController.abort()); |
|
|
| const form = new FormData(); |
|
|
| const optsJSON = JSON.stringify({ |
| inputs: opts.inputs, |
| id: opts.messageId, |
| is_retry: opts.isRetry, |
| is_continue: opts.isContinue, |
| web_search: opts.webSearch, |
| tools: opts.tools, |
| }); |
|
|
| opts.files?.forEach((file) => { |
| const name = file.type + ";" + file.name; |
|
|
| form.append("files", new File([file.value], name, { type: file.mime })); |
| }); |
|
|
| form.append("data", optsJSON); |
|
|
| const response = await fetch(`${opts.base}/conversation/${conversationId}`, { |
| method: "POST", |
| body: form, |
| signal: abortController.signal, |
| }); |
|
|
| if (!response.ok) { |
| const errorMessage = await response |
| .json() |
| .then((obj) => obj.message) |
| .catch(() => `Request failed with status code ${response.status}: ${response.statusText}`); |
| throw Error(errorMessage); |
| } |
| if (!response.body) { |
| throw Error("Body not defined"); |
| } |
|
|
| if (!(envPublic.PUBLIC_SMOOTH_UPDATES === "true")) { |
| return endpointStreamToIterator(response, abortController); |
| } |
|
|
| return smoothAsyncIterator( |
| streamMessageUpdatesToFullWords(endpointStreamToIterator(response, abortController)) |
| ); |
| } |
|
|
| async function* endpointStreamToIterator( |
| response: Response, |
| abortController: AbortController |
| ): AsyncGenerator<MessageUpdate> { |
| const reader = response.body?.pipeThrough(new TextDecoderStream()).getReader(); |
| if (!reader) throw Error("Response for endpoint had no body"); |
|
|
| |
| reader.closed.then(() => abortController.abort()); |
|
|
| |
| abortController.signal.addEventListener("abort", () => reader.cancel()); |
|
|
| |
| |
| let prevChunk = ""; |
| while (!abortController.signal.aborted) { |
| const { done, value } = await reader.read(); |
| if (done) { |
| abortController.abort(); |
| break; |
| } |
| if (!value) continue; |
|
|
| const { messageUpdates, remainingText } = parseMessageUpdates(prevChunk + value); |
| prevChunk = remainingText; |
| for (const messageUpdate of messageUpdates) yield messageUpdate; |
| } |
| } |
|
|
| function parseMessageUpdates(value: string): { |
| messageUpdates: MessageUpdate[]; |
| remainingText: string; |
| } { |
| const inputs = value.split("\n"); |
| const messageUpdates: MessageUpdate[] = []; |
| for (const input of inputs) { |
| try { |
| messageUpdates.push(JSON.parse(input) as MessageUpdate); |
| } catch (error) { |
| |
| if (error instanceof SyntaxError) { |
| return { |
| messageUpdates, |
| remainingText: inputs.at(-1) ?? "", |
| }; |
| } |
| } |
| } |
| return { messageUpdates, remainingText: "" }; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| async function* streamMessageUpdatesToFullWords( |
| iterator: AsyncGenerator<MessageUpdate> |
| ): AsyncGenerator<MessageUpdate> { |
| let bufferedStreamUpdates: MessageStreamUpdate[] = []; |
|
|
| const endAlphanumeric = /[a-zA-Z0-9À-ž'`]+$/; |
| const beginnningAlphanumeric = /^[a-zA-Z0-9À-ž'`]+/; |
|
|
| for await (const messageUpdate of iterator) { |
| if (messageUpdate.type !== "stream") { |
| yield messageUpdate; |
| continue; |
| } |
| bufferedStreamUpdates.push(messageUpdate); |
|
|
| let lastIndexEmitted = 0; |
| for (let i = 1; i < bufferedStreamUpdates.length; i++) { |
| const prevEndsAlphanumeric = endAlphanumeric.test(bufferedStreamUpdates[i - 1].token); |
| const currBeginsAlphanumeric = beginnningAlphanumeric.test(bufferedStreamUpdates[i].token); |
| const shouldCombine = prevEndsAlphanumeric && currBeginsAlphanumeric; |
| const combinedTooMany = i - lastIndexEmitted >= 5; |
| if (shouldCombine && !combinedTooMany) continue; |
|
|
| |
| yield { |
| type: MessageUpdateType.Stream, |
| token: bufferedStreamUpdates |
| .slice(lastIndexEmitted, i) |
| .map((_) => _.token) |
| .join(""), |
| }; |
| lastIndexEmitted = i; |
| } |
| bufferedStreamUpdates = bufferedStreamUpdates.slice(lastIndexEmitted); |
| } |
| for (const messageUpdate of bufferedStreamUpdates) yield messageUpdate; |
| } |
|
|
| |
| |
| |
| |
| async function* smoothAsyncIterator<T>(iterator: AsyncGenerator<T>): AsyncGenerator<T> { |
| const eventTarget = new EventTarget(); |
| let done = false; |
| const valuesBuffer: T[] = []; |
| const valueTimesMS: number[] = []; |
|
|
| const next = async () => { |
| const obj = await iterator.next(); |
| if (obj.done) { |
| done = true; |
| } else { |
| valuesBuffer.push(obj.value); |
| valueTimesMS.push(performance.now()); |
| next(); |
| } |
| eventTarget.dispatchEvent(new Event("next")); |
| }; |
| next(); |
|
|
| let timeOfLastEmitMS = performance.now(); |
| while (!done || valuesBuffer.length > 0) { |
| |
| const sampledTimesMS = valueTimesMS.slice(-30); |
|
|
| |
| const anomalyThresholdMS = 2000; |
| const anomalyDurationMS = sampledTimesMS |
| .map((time, i, times) => time - times[i - 1]) |
| .slice(1) |
| .filter((time) => time > anomalyThresholdMS) |
| .reduce((a, b) => a + b, 0); |
|
|
| |
| const totalTimeMSBetweenValues = sampledTimesMS.at(-1)! - sampledTimesMS[0]; |
| const timeMSBetweenValues = totalTimeMSBetweenValues - anomalyDurationMS; |
|
|
| const averageTimeMSBetweenValues = Math.min( |
| 200, |
| timeMSBetweenValues / (sampledTimesMS.length - 1) |
| ); |
| const timeSinceLastEmitMS = performance.now() - timeOfLastEmitMS; |
|
|
| |
| const gotNext = await Promise.race([ |
| sleep(Math.max(5, averageTimeMSBetweenValues - timeSinceLastEmitMS)), |
| waitForEvent(eventTarget, "next"), |
| ]); |
|
|
| |
| if (gotNext) continue; |
|
|
| |
| if (valuesBuffer.length === 0) continue; |
|
|
| |
| timeOfLastEmitMS = performance.now(); |
| |
| yield valuesBuffer.shift()!; |
| } |
| } |
|
|
| const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); |
| const waitForEvent = (eventTarget: EventTarget, eventName: string) => |
| new Promise<boolean>((resolve) => |
| eventTarget.addEventListener(eventName, () => resolve(true), { once: true }) |
| ); |
|
|