diff --git a/app/utils/aws.ts b/app/utils/aws.ts index 161766435..dffff338d 100644 --- a/app/utils/aws.ts +++ b/app/utils/aws.ts @@ -24,6 +24,10 @@ export interface ModelValidationConfig { customValidation?: (body: any) => string | null; } +// Type definitions for better type safety +type ParsedEvent = Record; +type EventResult = ParsedEvent[]; + // Encryption utilities export function encrypt(data: string, encryptionKey: string): string { if (!data) return ""; @@ -245,30 +249,29 @@ function decodeBase64(base64String: string): string { } } -export function parseEventData(chunk: Uint8Array): any { +export function parseEventData(chunk: Uint8Array): EventResult { const decoder = new TextDecoder(); const text = decoder.decode(chunk); - const results = []; + const results: EventResult = []; try { - // First try to parse as JSON + // First try to parse as regular JSON const parsed = JSON.parse(text); - - // Handle bytes field in the response if (parsed.bytes) { const decoded = decodeBase64(parsed.bytes); try { - results.push(JSON.parse(decoded)); + const decodedJson = JSON.parse(decoded); + results.push(decodedJson); } catch (e) { results.push({ output: decoded }); } return results; } - // Handle body field if (typeof parsed.body === "string") { try { - results.push(JSON.parse(parsed.body)); + const parsedBody = JSON.parse(parsed.body); + results.push(parsedBody); } catch (e) { results.push({ output: parsed.body }); } @@ -278,48 +281,54 @@ export function parseEventData(chunk: Uint8Array): any { results.push(parsed.body || parsed); return results; } catch (e) { - try { - // Handle event-stream format - const eventRegex = /:event-type[^\{]+({.*?})/g; - let match; - while ((match = eventRegex.exec(text)) !== null) { - try { - const eventData = match[1]; - const parsed = JSON.parse(eventData); - if (parsed.bytes) { - const decoded = decodeBase64(parsed.bytes); - try { - results.push(JSON.parse(decoded)); - } catch (e) { - results.push({ output: decoded }); + // If regular JSON parse fails, try to extract event content + const eventRegex = /:event-type[^\{]+(\{[^\}]+\})/g; + let match; + + while ((match = eventRegex.exec(text)) !== null) { + try { + const eventData = match[1]; + const parsed = JSON.parse(eventData); + + if (parsed.bytes) { + const decoded = decodeBase64(parsed.bytes); + try { + const decodedJson = JSON.parse(decoded); + if (decodedJson.choices?.[0]?.message?.content) { + results.push({ output: decodedJson.choices[0].message.content }); + } else { + results.push(decodedJson); } - } else { - results.push(parsed); + } catch (e) { + results.push({ output: decoded }); } - } catch (e) { - results.push({ output: match[1] }); + } else { + results.push(parsed); } + } catch (e) { + console.debug("[Event Parse Warning]:", e); } + } - if (results.length > 0) { - return results; - } + // If no events were found, try to extract clean text + if (results.length === 0) { + // Remove event metadata markers and clean the text + const cleanText = text + .replace(/\{KG[^:]+:event-type[^}]+\}/g, "") // Remove event markers + .replace(/[\x00-\x1F\x7F-\x9F\uFEFF]/g, "") // Remove control characters + .trim(); - // Handle plain text responses - if (text.trim()) { - const cleanText = text.replace(/[\x00-\x1F\x7F-\x9F]/g, ""); - results.push({ output: cleanText.trim() }); - return results; + if (cleanText) { + results.push({ output: cleanText }); } - } catch (innerError) { - console.error("[AWS Parse Error] Inner parsing failed:", innerError); } } - return []; + + return results; } export function processMessage( - data: any, + data: ParsedEvent, remainText: string, runTools: any[], index: number, @@ -470,8 +479,18 @@ export function processChunks( while (chunks.length > 0) { const chunk = chunks[0]; try { + // If there's a pending chunk, try to merge it with the current chunk + let chunkToProcess = chunk; + if (pendingChunk) { + const mergedChunk = new Uint8Array(pendingChunk.length + chunk.length); + mergedChunk.set(pendingChunk); + mergedChunk.set(chunk, pendingChunk.length); + chunkToProcess = mergedChunk; + pendingChunk = null; + } + // Try to process the chunk - const parsedEvents = parseEventData(chunk); + const parsedEvents = parseEventData(chunkToProcess); if (parsedEvents.length > 0) { // Process each event in the chunk for (const parsed of parsedEvents) { @@ -485,60 +504,15 @@ export function processChunks( currentIndex = result.index; } chunks.shift(); // Remove processed chunk - - // If there's a pending chunk, try to process it now - if (pendingChunk) { - const pendingEvents = parseEventData(pendingChunk); - if (pendingEvents.length > 0) { - for (const pendingParsed of pendingEvents) { - const pendingResult = processMessage( - pendingParsed, - currentText, - runTools, - currentIndex, - ); - currentText = pendingResult.remainText; - currentIndex = pendingResult.index; - } - pendingChunk = null; - } - } } else { // If parsing fails, it might be an incomplete chunk - if (pendingChunk) { - // Merge with pending chunk - const mergedChunk = new Uint8Array( - pendingChunk.length + chunk.length, - ); - mergedChunk.set(pendingChunk); - mergedChunk.set(chunk, pendingChunk.length); - pendingChunk = mergedChunk; - } else { - pendingChunk = chunk; - } + pendingChunk = chunkToProcess; chunks.shift(); } } catch (e) { console.error("[Chunk Process Error]:", e); chunks.shift(); // Remove error chunk - } - } - - // Try to process any remaining pending chunk one last time - if (pendingChunk) { - const finalEvents = parseEventData(pendingChunk); - if (finalEvents.length > 0) { - for (const finalParsed of finalEvents) { - const finalResult = processMessage( - finalParsed, - currentText, - runTools, - currentIndex, - ); - currentText = finalResult.remainText; - currentIndex = finalResult.index; - } - pendingChunk = null; + pendingChunk = null; // Reset pending chunk on error } }