优化bedrock工具类,以更安全的处理流式消息的边界值

This commit is contained in:
glay 2024-12-07 17:54:12 +08:00
parent 57dc44a54f
commit ad49cd0454

View File

@ -24,6 +24,10 @@ export interface ModelValidationConfig {
customValidation?: (body: any) => string | null; customValidation?: (body: any) => string | null;
} }
// Type definitions for better type safety
type ParsedEvent = Record<string, any>;
type EventResult = ParsedEvent[];
// Encryption utilities // Encryption utilities
export function encrypt(data: string, encryptionKey: string): string { export function encrypt(data: string, encryptionKey: string): string {
if (!data) return ""; if (!data) return "";
@ -245,30 +249,29 @@ function decodeBase64(base64String: string): string {
} }
} }
export function parseEventData(chunk: Uint8Array): any { export function parseEventData(chunk: Uint8Array): EventResult {
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const text = decoder.decode(chunk); const text = decoder.decode(chunk);
const results = []; const results: EventResult = [];
try { try {
// First try to parse as JSON // First try to parse as regular JSON
const parsed = JSON.parse(text); const parsed = JSON.parse(text);
// Handle bytes field in the response
if (parsed.bytes) { if (parsed.bytes) {
const decoded = decodeBase64(parsed.bytes); const decoded = decodeBase64(parsed.bytes);
try { try {
results.push(JSON.parse(decoded)); const decodedJson = JSON.parse(decoded);
results.push(decodedJson);
} catch (e) { } catch (e) {
results.push({ output: decoded }); results.push({ output: decoded });
} }
return results; return results;
} }
// Handle body field
if (typeof parsed.body === "string") { if (typeof parsed.body === "string") {
try { try {
results.push(JSON.parse(parsed.body)); const parsedBody = JSON.parse(parsed.body);
results.push(parsedBody);
} catch (e) { } catch (e) {
results.push({ output: parsed.body }); results.push({ output: parsed.body });
} }
@ -278,48 +281,54 @@ export function parseEventData(chunk: Uint8Array): any {
results.push(parsed.body || parsed); results.push(parsed.body || parsed);
return results; return results;
} catch (e) { } catch (e) {
try { // If regular JSON parse fails, try to extract event content
// Handle event-stream format const eventRegex = /:event-type[^\{]+(\{[^\}]+\})/g;
const eventRegex = /:event-type[^\{]+({.*?})/g; let match;
let match;
while ((match = eventRegex.exec(text)) !== null) { while ((match = eventRegex.exec(text)) !== null) {
try { try {
const eventData = match[1]; const eventData = match[1];
const parsed = JSON.parse(eventData); const parsed = JSON.parse(eventData);
if (parsed.bytes) {
const decoded = decodeBase64(parsed.bytes); if (parsed.bytes) {
try { const decoded = decodeBase64(parsed.bytes);
results.push(JSON.parse(decoded)); try {
} catch (e) { const decodedJson = JSON.parse(decoded);
results.push({ output: decoded }); if (decodedJson.choices?.[0]?.message?.content) {
results.push({ output: decodedJson.choices[0].message.content });
} else {
results.push(decodedJson);
} }
} else { } catch (e) {
results.push(parsed); results.push({ output: decoded });
} }
} catch (e) { } else {
results.push({ output: match[1] }); results.push(parsed);
} }
} catch (e) {
console.debug("[Event Parse Warning]:", e);
} }
}
if (results.length > 0) { // If no events were found, try to extract clean text
return results; if (results.length === 0) {
} // Remove event metadata markers and clean the text
const cleanText = text
.replace(/\{KG[^:]+:event-type[^}]+\}/g, "") // Remove event markers
.replace(/[\x00-\x1F\x7F-\x9F\uFEFF]/g, "") // Remove control characters
.trim();
// Handle plain text responses if (cleanText) {
if (text.trim()) { results.push({ output: cleanText });
const cleanText = text.replace(/[\x00-\x1F\x7F-\x9F]/g, "");
results.push({ output: cleanText.trim() });
return results;
} }
} catch (innerError) {
console.error("[AWS Parse Error] Inner parsing failed:", innerError);
} }
} }
return [];
return results;
} }
export function processMessage( export function processMessage(
data: any, data: ParsedEvent,
remainText: string, remainText: string,
runTools: any[], runTools: any[],
index: number, index: number,
@ -470,8 +479,18 @@ export function processChunks(
while (chunks.length > 0) { while (chunks.length > 0) {
const chunk = chunks[0]; const chunk = chunks[0];
try { try {
// If there's a pending chunk, try to merge it with the current chunk
let chunkToProcess = chunk;
if (pendingChunk) {
const mergedChunk = new Uint8Array(pendingChunk.length + chunk.length);
mergedChunk.set(pendingChunk);
mergedChunk.set(chunk, pendingChunk.length);
chunkToProcess = mergedChunk;
pendingChunk = null;
}
// Try to process the chunk // Try to process the chunk
const parsedEvents = parseEventData(chunk); const parsedEvents = parseEventData(chunkToProcess);
if (parsedEvents.length > 0) { if (parsedEvents.length > 0) {
// Process each event in the chunk // Process each event in the chunk
for (const parsed of parsedEvents) { for (const parsed of parsedEvents) {
@ -485,60 +504,15 @@ export function processChunks(
currentIndex = result.index; currentIndex = result.index;
} }
chunks.shift(); // Remove processed chunk chunks.shift(); // Remove processed chunk
// If there's a pending chunk, try to process it now
if (pendingChunk) {
const pendingEvents = parseEventData(pendingChunk);
if (pendingEvents.length > 0) {
for (const pendingParsed of pendingEvents) {
const pendingResult = processMessage(
pendingParsed,
currentText,
runTools,
currentIndex,
);
currentText = pendingResult.remainText;
currentIndex = pendingResult.index;
}
pendingChunk = null;
}
}
} else { } else {
// If parsing fails, it might be an incomplete chunk // If parsing fails, it might be an incomplete chunk
if (pendingChunk) { pendingChunk = chunkToProcess;
// Merge with pending chunk
const mergedChunk = new Uint8Array(
pendingChunk.length + chunk.length,
);
mergedChunk.set(pendingChunk);
mergedChunk.set(chunk, pendingChunk.length);
pendingChunk = mergedChunk;
} else {
pendingChunk = chunk;
}
chunks.shift(); chunks.shift();
} }
} catch (e) { } catch (e) {
console.error("[Chunk Process Error]:", e); console.error("[Chunk Process Error]:", e);
chunks.shift(); // Remove error chunk chunks.shift(); // Remove error chunk
} pendingChunk = null; // Reset pending chunk on error
}
// Try to process any remaining pending chunk one last time
if (pendingChunk) {
const finalEvents = parseEventData(pendingChunk);
if (finalEvents.length > 0) {
for (const finalParsed of finalEvents) {
const finalResult = processMessage(
finalParsed,
currentText,
runTools,
currentIndex,
);
currentText = finalResult.remainText;
currentIndex = finalResult.index;
}
pendingChunk = null;
} }
} }