diff --git a/.vscode/settings.json b/.vscode/settings.json index c394720..ddd3786 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,6 +4,10 @@ "npx tsc": true, "git remote": true, "npx asar": true, - "npx tsx": true + "npx tsx": true, + "gh": true, + "git add": true, + "git commit": true, + "git push": true } -} \ No newline at end of file +} diff --git a/AGENTS.md b/AGENTS.md index a2c9b63..9a4525e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,6 +7,11 @@ This document provides context and best practices for GitHub Copilot when workin - Make the plan extremely concise. Sacrifice grammar for the sake of concision. - At the end of each plan, give me a list of unresolved questions to answer, if any. +## Commits + +- commit messages are short - one sentence. do not write long articles. +- pull requests are more verbose and especially give reasoning for changes + --- ## ⚠️ MANDATORY: Test-First Development diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 80c83ba..e24b0c5 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -12,6 +12,14 @@ import https from 'https'; import http from 'http'; import { URL } from 'url'; import { BrowserWindow } from 'electron'; +import { + parseAnthropicStreamEvent, + parseOpenAIStreamEvent, + createAnthropicStreamAccumulator, + createOpenAIStreamAccumulator, + httpRequestStream, + withRetry, +} from './streaming'; import { ChatEngine } from './ChatEngine'; import { PostEngine, type PostData } from './PostEngine'; import { MediaEngine, type MediaData } from './MediaEngine'; @@ -129,6 +137,8 @@ interface AnthropicContentBlock { input?: unknown; tool_use_id?: string; content?: string | AnthropicToolResultContent[]; + is_error?: boolean; + signature?: string; source?: { type: 'base64'; media_type: string; @@ -463,6 +473,7 @@ export class OpenCodeManager { while (round < MAX_TOOL_ROUNDS) { round++; + if (signal.aborted) break; const body: Record = { model: modelId, @@ -470,42 +481,79 @@ export class OpenCodeManager { system: systemPrompt, messages, tools, + stream: true, cache_control: { type: 'ephemeral' }, }; - const response = await this.httpRequest(ZEN_ANTHROPIC_URL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': this.apiKey, - 'Authorization': `Bearer ${this.apiKey}`, - 'anthropic-version': '2023-06-01', - }, - body: JSON.stringify(body), - signal, + // Retry only the HTTP connection (429/502/503 are caught before any events are emitted). + // Event processing is outside retry scope to prevent double-emission of onDelta on retry. + const { events } = await withRetry(async () => { + return httpRequestStream(ZEN_ANTHROPIC_URL, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': this.apiKey, + 'Authorization': `Bearer ${this.apiKey}`, + 'anthropic-version': '2023-06-01', + }, + body: JSON.stringify(body), + signal, + }); }); - if (response.statusCode >= 400) { - const errorMsg = this.parseErrorResponse(response); - throw new Error(errorMsg); + // Process stream events outside retry scope — onDelta is never called twice for the same text + const streamAccumulator = createAnthropicStreamAccumulator(); + let stopReason = ''; + let inputTokens = 0; + let outputTokens = 0; + let cacheReadTokens = 0; + let cacheWriteTokens = 0; + let roundText = ''; + let receivedUsage = false; + + try { + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, streamAccumulator); + + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } + } + + if (result.usage) { + receivedUsage = true; + if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; + if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; + } + + if (result.finishReason) { + stopReason = result.finishReason; + } + + if (result.done) break; + } + } finally { + // Preserve text already emitted via onDelta even if the stream errors mid-round + accumulatedText += roundText; } - const data = JSON.parse(response.body); + const streamToolCalls = streamAccumulator.toolCalls; + const streamThinkingBlocks = streamAccumulator.thinkingBlocks; - // Extract and emit token usage - if (data.usage && callbacks.onTokenUsage) { - const usage = data.usage; - const cacheReadTokens = usage.cache_read_input_tokens || 0; - const cacheWriteTokens = usage.cache_creation_input_tokens || 0; - const inputTokens = (usage.input_tokens || 0) - cacheReadTokens - cacheWriteTokens; - const outputTokens = usage.output_tokens || 0; - const totalTokens = (usage.input_tokens || 0) + outputTokens; + // Emit token usage after stream completes (only when usage data was received) + if (callbacks.onTokenUsage && receivedUsage) { + const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens; + const totalTokens = inputTokens + outputTokens; const prev = this.conversationUsage.get(conversationId) || { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0, }; const cumulative = { - inputTokens: prev.inputTokens + inputTokens, + inputTokens: prev.inputTokens + adjustedInputTokens, outputTokens: prev.outputTokens + outputTokens, cacheReadTokens: prev.cacheReadTokens + cacheReadTokens, cacheWriteTokens: prev.cacheWriteTokens + cacheWriteTokens, @@ -513,7 +561,7 @@ export class OpenCodeManager { this.conversationUsage.set(conversationId, cumulative); callbacks.onTokenUsage({ - inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens, totalTokens, + inputTokens: adjustedInputTokens, outputTokens, cacheReadTokens, cacheWriteTokens, totalTokens, cumulativeInputTokens: cumulative.inputTokens, cumulativeOutputTokens: cumulative.outputTokens, cumulativeCacheReadTokens: cumulative.cacheReadTokens, @@ -522,35 +570,20 @@ export class OpenCodeManager { }); } - console.log('[OpenCodeManager] Round', round, 'stop_reason:', data.stop_reason, 'content blocks:', JSON.stringify(data.content?.map((b: AnthropicContentBlock) => ({ type: b.type, textLen: b.text?.length, name: b.name })))); - - if (!data.content) { - throw new Error('API response missing content field'); - } - - // Check if there are tool_use blocks - const toolUseBlocks = (data.content as AnthropicContentBlock[]).filter( - (b: AnthropicContentBlock) => b.type === 'tool_use' - ); - - // Capture text from any block type that has a text field (text, thinking, etc.) - const textBlocks = (data.content as AnthropicContentBlock[]).filter( - (b: AnthropicContentBlock) => b.text - ); - - // Accumulate and stream text content to frontend - for (const block of textBlocks) { - if (block.text) { - accumulatedText += block.text; - if (callbacks.onDelta) { - callbacks.onDelta(block.text); - } + // Collect tool calls from stream accumulator + const toolUseBlocks: Array<{ id: string; name: string; input: unknown; parseError?: string }> = []; + for (const [, tc] of streamToolCalls) { + try { + toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) }); + } catch (e) { + console.error(`[OpenCodeManager] Failed to parse tool arguments for ${tc.name}:`, tc.arguments); + toolUseBlocks.push({ id: tc.id, name: tc.name, input: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` }); } } - console.log('[OpenCodeManager] Round', round, 'accumulatedText length:', accumulatedText.length, 'toolUseBlocks:', toolUseBlocks.length); + console.log('[OpenCodeManager] Round', round, 'stopReason:', stopReason, 'accumulatedText length:', accumulatedText.length, 'toolCalls:', toolUseBlocks.length); - if (toolUseBlocks.length === 0 || data.stop_reason !== 'tool_use') { + if (toolUseBlocks.length === 0 || stopReason !== 'tool_use') { // No more tool calls - return all accumulated text console.log('[OpenCodeManager] Returning accumulated text length:', accumulatedText.length); return { content: accumulatedText, toolCalls: allToolCalls }; @@ -558,11 +591,37 @@ export class OpenCodeManager { // Execute tool calls const toolResults: AnthropicContentBlock[] = []; + // Build assistant content blocks for the next message round + const assistantContentBlocks: AnthropicContentBlock[] = []; + + // Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled) + for (const [, tb] of streamThinkingBlocks) { + if (tb.text) { + const thinkingBlock: AnthropicContentBlock = { type: 'thinking', text: tb.text }; + if (tb.signature) { + thinkingBlock.signature = tb.signature; + } + assistantContentBlocks.push(thinkingBlock); + } + } + + // Add text block with text from this round + if (roundText) { + assistantContentBlocks.push({ type: 'text', text: roundText }); + } for (const toolBlock of toolUseBlocks) { - const toolName = toolBlock.name!; + const toolName = toolBlock.name; const toolArgs = toolBlock.input; - const toolUseId = toolBlock.id!; + const toolUseId = toolBlock.id; + + // Add tool_use block to assistant content + assistantContentBlocks.push({ + type: 'tool_use', + id: toolUseId, + name: toolName, + input: toolArgs, + }); allToolCalls.push({ name: toolName, args: toolArgs }); @@ -570,6 +629,21 @@ export class OpenCodeManager { callbacks.onToolCall({ name: toolName, args: toolArgs }); } + // If JSON parsing of tool arguments failed, report the error to the model + if (toolBlock.parseError) { + const errorResult = { error: true, message: toolBlock.parseError }; + if (callbacks.onToolResult) { + callbacks.onToolResult({ name: toolName, result: errorResult }); + } + toolResults.push({ + type: 'tool_result', + tool_use_id: toolUseId, + content: JSON.stringify(errorResult), + is_error: true, + }); + continue; + } + // Check if this is a render tool — generate A2UI messages instead of executing if (isRenderTool(toolName)) { const a2uiMessages = generateFromToolCall( @@ -593,7 +667,8 @@ export class OpenCodeManager { continue; } - // Execute the tool + // Execute the tool (check abort before each tool execution) + if (signal.aborted) break; const result = await this.executeTool(toolName, toolArgs as Record); if (callbacks.onToolResult) { @@ -640,10 +715,12 @@ export class OpenCodeManager { } } + if (signal.aborted) break; + // Add assistant response and tool results to messages for next round messages = [ ...messages, - { role: 'assistant' as const, content: data.content }, + { role: 'assistant' as const, content: assistantContentBlocks }, { role: 'user' as const, content: toolResults }, ]; } @@ -712,39 +789,77 @@ export class OpenCodeManager { while (round < MAX_TOOL_ROUNDS) { round++; + if (signal.aborted) break; const body: Record = { model: modelId, max_tokens: 4096, messages, tools: openaiTools, + stream: true, + stream_options: { include_usage: true }, }; - const response = await this.httpRequest(ZEN_OPENAI_URL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${this.apiKey}`, - }, - body: JSON.stringify(body), - signal, + // Retry only the HTTP connection (429/502/503 are caught before any events are emitted). + // Event processing is outside retry scope to prevent double-emission of onDelta on retry. + const { events } = await withRetry(async () => { + return httpRequestStream(ZEN_OPENAI_URL, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${this.apiKey}`, + }, + body: JSON.stringify(body), + signal, + }); }); - if (response.statusCode >= 400) { - const errorMsg = this.parseErrorResponse(response); - throw new Error(errorMsg); + // Process stream events outside retry scope — onDelta is never called twice for the same text + const streamAccumulator = createOpenAIStreamAccumulator(); + let finishReason = ''; + let promptTokens = 0; + let completionTokens = 0; + let totalTokens = 0; + let cacheReadTokens = 0; + let roundText = ''; + let receivedUsage = false; + + try { + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, streamAccumulator); + + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } + } + + if (result.usage) { + receivedUsage = true; + if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; + if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; + if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + } + + if (result.finishReason) { + finishReason = result.finishReason; + } + + if (result.done) break; + } + } finally { + // Preserve text already emitted via onDelta even if the stream errors mid-round + accumulatedText += roundText; } - const data = JSON.parse(response.body); - const choice = data.choices?.[0]; + const streamToolCalls = streamAccumulator.toolCalls; - // Extract and emit token usage (OpenAI format) - if (data.usage && callbacks.onTokenUsage) { - const usage = data.usage; - const cacheReadTokens = usage.prompt_tokens_details?.cached_tokens || 0; - const inputTokens = (usage.prompt_tokens || 0) - cacheReadTokens; - const outputTokens = usage.completion_tokens || 0; - const totalTokens = usage.total_tokens || (usage.prompt_tokens || 0) + outputTokens; + // Emit token usage after stream completes (only when usage data was received) + if (callbacks.onTokenUsage && receivedUsage) { + const inputTokens = promptTokens - cacheReadTokens; + const outputTokens = completionTokens; const prev = this.conversationUsage.get(conversationId) || { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0, @@ -758,7 +873,9 @@ export class OpenCodeManager { this.conversationUsage.set(conversationId, cumulative); callbacks.onTokenUsage({ - inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens: 0, totalTokens, + inputTokens, outputTokens, cacheReadTokens, + cacheWriteTokens: 0, // OpenAI streaming does not report cache write tokens + totalTokens: totalTokens || inputTokens + outputTokens, cumulativeInputTokens: cumulative.inputTokens, cumulativeOutputTokens: cumulative.outputTokens, cumulativeCacheReadTokens: cumulative.cacheReadTokens, @@ -767,66 +884,64 @@ export class OpenCodeManager { }); } - console.log('[OpenCodeManager:OpenAI] Round', round, 'status:', response.statusCode, 'content type:', typeof choice?.message?.content, 'content length:', choice?.message?.content?.length, 'tool_calls:', choice?.message?.tool_calls?.length); - - if (!choice?.message) { - throw new Error('API response missing expected message content'); + // Collect tool calls from stream accumulator + const parsedToolCalls: Array<{ id: string; name: string; args: unknown; parseError?: string }> = []; + for (const [, tc] of streamToolCalls) { + try { + parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) }); + } catch (e) { + console.error(`[OpenCodeManager:OpenAI] Failed to parse tool arguments for ${tc.name}:`, tc.arguments); + parsedToolCalls.push({ id: tc.id, name: tc.name, args: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` }); + } } - // Handle content that might be a string or an array of content parts - let textContent = ''; - const content = choice.message.content; - if (typeof content === 'string') { - textContent = content; - } else if (Array.isArray(content)) { - // Handle array of content parts (some models return this format) - // Accept any part that has a text field, regardless of type - textContent = content - .filter((part: { type?: string; text?: string }) => part.text) - .map((part: { text: string }) => part.text) - .join(''); - - // Log what types we're seeing for debugging - const types = content.map((p: { type?: string }) => p.type).filter(Boolean); - if (types.length > 0) { - console.log('[OpenCodeManager:OpenAI] Content block types:', types); - } - } else if (content && typeof content === 'object') { - // Handle single object with text field - if ('text' in content && typeof content.text === 'string') { - textContent = content.text; - } - } - - if (textContent) { - accumulatedText += textContent; - if (callbacks.onDelta) { - callbacks.onDelta(textContent); - } - } + console.log('[OpenCodeManager:OpenAI] Round', round, 'finishReason:', finishReason, 'text length:', accumulatedText.length, 'toolCalls:', parsedToolCalls.length); // If no tool calls, we're done - if (!choice.message.tool_calls || choice.message.tool_calls.length === 0) { + if (parsedToolCalls.length === 0 || finishReason !== 'tool_calls') { console.log('[OpenCodeManager:OpenAI] Done. Accumulated text length:', accumulatedText.length); return { content: accumulatedText, toolCalls: allToolCalls }; } - // Add assistant message (with tool_calls) to conversation - messages.push(choice.message); + // Build the assistant message with tool_calls for conversation history + const assistantMessage: Record = { + role: 'assistant', + content: roundText || null, + tool_calls: parsedToolCalls.map((tc) => ({ + id: tc.id, + type: 'function', + function: { name: tc.name, arguments: JSON.stringify(tc.args) }, + })), + }; + messages.push(assistantMessage); // Execute tool calls and add results - for (const toolCall of choice.message.tool_calls) { - const toolName = toolCall.function.name; - const toolArgs = JSON.parse(toolCall.function.arguments || '{}'); + for (const toolCall of parsedToolCalls) { + const toolName = toolCall.name; + const toolArgs = toolCall.args; allToolCalls.push({ name: toolName, args: toolArgs }); if (callbacks.onToolCall) { callbacks.onToolCall({ name: toolName, args: toolArgs }); } + // If JSON parsing of tool arguments failed, report the error to the model + if (toolCall.parseError) { + const errorResult = { error: true, message: toolCall.parseError }; + if (callbacks.onToolResult) { + callbacks.onToolResult({ name: toolName, result: errorResult }); + } + messages.push({ + role: 'tool', + content: JSON.stringify(errorResult), + tool_call_id: toolCall.id, + }); + continue; + } + // Check if this is a render tool if (isRenderTool(toolName)) { - const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs); + const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs as Record); if (a2uiMessages) { emitA2UIMessages(a2uiMessages); } @@ -843,7 +958,9 @@ export class OpenCodeManager { continue; } - const result = await this.executeTool(toolName, toolArgs); + // Check abort before each tool execution + if (signal.aborted) break; + const result = await this.executeTool(toolName, toolArgs as Record); if (callbacks.onToolResult) { callbacks.onToolResult({ name: toolName, result }); @@ -855,6 +972,8 @@ export class OpenCodeManager { tool_call_id: toolCall.id, }); } + + if (signal.aborted) break; } // Hit max rounds @@ -2251,7 +2370,7 @@ Respond with JSON only: {"title": "...", "alt": "...", "caption": "..."}`; options.signal.addEventListener('abort', () => { req.destroy(); reject(new Error('Request cancelled')); - }); + }, { once: true }); } if (options.body) { diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts new file mode 100644 index 0000000..5f40ac8 --- /dev/null +++ b/src/main/engine/streaming.ts @@ -0,0 +1,620 @@ +/** + * SSE Streaming Infrastructure + * + * Provides SSE line parsing, event parsers for OpenAI/Mistral and Anthropic + * stream formats, tool-call accumulation, and retry-with-exponential-backoff. + * + * Used by OpenCodeManager to convert buffered HTTP calls to real-time + * token-by-token streaming for all chat providers. + */ + +import https from 'https'; +import http from 'http'; +import { URL } from 'url'; + +// ── Types ── + +export interface SSEEvent { + event?: string; + data: string; +} + +export interface StreamEventResult { + /** Text content delta to emit to UI */ + textDelta?: string; + /** Whether the stream is complete */ + done: boolean; + /** Finish reason from the model */ + finishReason?: string; + /** Token usage information */ + usage?: { + promptTokens?: number; + completionTokens?: number; + totalTokens?: number; + inputTokens?: number; + outputTokens?: number; + cacheReadTokens?: number; + cacheWriteTokens?: number; + }; +} + +interface ToolCallAccumulator { + id: string; + name: string; + arguments: string; +} + +export interface OpenAIStreamAccumulator { + toolCalls: Map; +} + +export interface AnthropicStreamAccumulator { + toolCalls: Map; + thinkingBlocks: Map; +} + +export interface HttpStreamError extends Error { + statusCode?: number; + retryAfter?: number; + isAbort?: boolean; +} + +// ── SSE Line Parsing ── + +/** + * Parse raw SSE text into structured events. + * + * SSE protocol: events are separated by double-newlines (\n\n). + * Each event can have `event:` and `data:` lines. + * Multiple `data:` lines within one event are concatenated with newlines. + * Lines starting with `:` are comments (ignored). + * + * Returns parsed events and any remaining incomplete text (buffer). + */ +export function parseSSELines(text: string): { events: SSEEvent[]; remaining: string } { + const events: SSEEvent[] = []; + + // Normalize \r\n to \n + const normalized = text.replace(/\r\n/g, '\n'); + + // Split on double-newline (event boundary) + const parts = normalized.split('\n\n'); + + // Last part may be incomplete (no trailing \n\n) + const remaining = normalized.endsWith('\n\n') ? '' : parts.pop() || ''; + + for (const part of parts) { + if (!part.trim()) continue; + + let eventType: string | undefined; + const dataLines: string[] = []; + + for (const line of part.split('\n')) { + // Comment lines start with ':' + if (line.startsWith(':')) continue; + + if (line.startsWith('event: ') || line.startsWith('event:')) { + const afterColon = line.slice(line.indexOf(':') + 1); + eventType = afterColon.startsWith(' ') ? afterColon.slice(1) : afterColon; + } else if (line.startsWith('data: ') || line.startsWith('data:')) { + const afterColon = line.slice(line.indexOf(':') + 1); + dataLines.push(afterColon.startsWith(' ') ? afterColon.slice(1) : afterColon); + } + } + + if (dataLines.length > 0) { + events.push({ + event: eventType, + data: dataLines.join('\n'), + }); + } + } + + return { events, remaining }; +} + +// ── Accumulator Factories ── + +export function createOpenAIStreamAccumulator(): OpenAIStreamAccumulator { + return { toolCalls: new Map() }; +} + +export function createAnthropicStreamAccumulator(): AnthropicStreamAccumulator { + return { toolCalls: new Map(), thinkingBlocks: new Map() }; +} + +// ── OpenAI/Mistral SSE Parser ── + +/** + * Parse a single OpenAI/Mistral SSE event and update the accumulator. + * + * OpenAI streaming format: + * - Text deltas: choices[0].delta.content + * - Tool call start: delta.tool_calls[i] with id + function.name + * - Tool call fragments: delta.tool_calls[i].function.arguments (append) + * - Finish reason: choices[0].finish_reason + * - Usage: usage object in final chunk (requires stream_options.include_usage) + * - [DONE] sentinel: stop iteration + */ +export function parseOpenAIStreamEvent( + event: SSEEvent, + accumulator: OpenAIStreamAccumulator, +): StreamEventResult { + // Handle [DONE] sentinel + if (event.data === '[DONE]') { + return { done: true }; + } + + let data: Record; + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data = JSON.parse(event.data) as any; + } catch { + // Skip corrupted SSE events (e.g. partial JSON from TCP split) + return { done: false }; + } + const choice = (data as any).choices?.[0]; + const result: StreamEventResult = { done: false }; + + if (choice) { + const delta = choice.delta; + + // Text content delta + if (delta?.content && delta.content.length > 0) { + result.textDelta = delta.content; + } + + // Tool calls + if (delta?.tool_calls) { + for (const tc of delta.tool_calls) { + const idx = tc.index; + const existing = accumulator.toolCalls.get(idx); + + if (tc.id || tc.function?.name) { + // New tool call or update + if (!existing) { + accumulator.toolCalls.set(idx, { + id: tc.id || '', + name: tc.function?.name || '', + arguments: tc.function?.arguments || '', + }); + } else { + if (tc.id) existing.id = tc.id; + if (tc.function?.name) existing.name = tc.function.name; + if (tc.function?.arguments) existing.arguments += tc.function.arguments; + } + } else if (existing && tc.function?.arguments) { + // Append argument fragment + existing.arguments += tc.function.arguments; + } + } + } + + // Finish reason + if (choice.finish_reason) { + result.finishReason = choice.finish_reason; + } + } + + // Token usage (arrives in final chunk with stream_options.include_usage) + if ((data as any).usage) { + const usage = (data as any).usage; + const promptDetails = usage.prompt_tokens_details; + result.usage = { + promptTokens: usage.prompt_tokens, + completionTokens: usage.completion_tokens, + totalTokens: usage.total_tokens, + cacheReadTokens: promptDetails?.cached_tokens, + }; + } + + return result; +} + +// ── Anthropic SSE Parser ── + +/** + * Parse a single Anthropic SSE event and update the accumulator. + * + * Anthropic streaming format uses named event types: + * - message_start: input token usage + * - content_block_start: text, tool_use, or thinking block begins + * - content_block_delta: text_delta, input_json_delta, or thinking_delta + * - content_block_stop: block ends + * - message_delta: output tokens + stop_reason + * - message_stop: stream complete + * - ping: keep-alive (ignored) + * - error: server error mid-stream + */ +export function parseAnthropicStreamEvent( + event: SSEEvent, + accumulator: AnthropicStreamAccumulator, +): StreamEventResult { + let data: Record; + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data = JSON.parse(event.data) as any; + } catch { + // Skip corrupted SSE events (e.g. partial JSON from TCP split) + return { done: false }; + } + const result: StreamEventResult = { done: false }; + + switch (event.event) { + case 'message_start': { + const usage = (data as any).message?.usage; + if (usage) { + result.usage = { + inputTokens: usage.input_tokens || 0, + cacheReadTokens: usage.cache_read_input_tokens || 0, + cacheWriteTokens: usage.cache_creation_input_tokens || 0, + }; + } + break; + } + + case 'content_block_start': { + const block = (data as any).content_block; + if (block?.type === 'tool_use') { + accumulator.toolCalls.set(data.index as number, { + id: block.id, + name: block.name, + arguments: '', + }); + } else if (block?.type === 'thinking') { + accumulator.thinkingBlocks.set(data.index as number, { text: '' }); + } + // text block start is a no-op (empty initial text) + break; + } + + case 'content_block_delta': { + const delta = (data as any).delta; + if (delta?.type === 'text_delta' && delta.text) { + result.textDelta = delta.text; + } else if (delta?.type === 'input_json_delta' && delta.partial_json) { + const tc = accumulator.toolCalls.get(data.index as number); + if (tc) { + tc.arguments += delta.partial_json; + } + } else if (delta?.type === 'thinking_delta' && delta.thinking) { + const tb = accumulator.thinkingBlocks.get(data.index as number); + if (tb) { + tb.text += delta.thinking; + } + } + break; + } + + case 'content_block_stop': { + // Block is complete. Tool arguments can now be parsed by the caller. + // For thinking blocks, capture the signature (required by Anthropic when replaying thinking blocks). + const stopBlock = (data as any).content_block; + if (stopBlock?.type === 'thinking' && stopBlock.signature) { + const tb = accumulator.thinkingBlocks.get(data.index as number); + if (tb) { + tb.signature = stopBlock.signature; + } + } + break; + } + + case 'message_delta': { + if ((data as any).usage) { + result.usage = { + outputTokens: (data as any).usage.output_tokens || 0, + }; + } + if ((data as any).delta?.stop_reason) { + result.finishReason = (data as any).delta.stop_reason; + } + break; + } + + case 'message_stop': + result.done = true; + break; + + case 'ping': + // Keep-alive, ignore + break; + + case 'error': { + const errorMsg = (data as any).error?.message || 'Unknown streaming error'; + throw new Error(errorMsg); + } + + default: + // Unknown event type, ignore + break; + } + + return result; +} + +// ── Retry with Exponential Backoff ── + +const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]); + +/** + * Retry a function with exponential backoff for transient HTTP errors. + * + * Retries on 429 (rate limit), 502 (bad gateway), 503 (service unavailable). + * Also retries errors without a statusCode (e.g. ECONNRESET, EPIPE) since + * these indicate transient network failures during connection. + * + * Does NOT retry on other 4xx errors (400, 401, 403 — client errors) or abort. + * Respects Retry-After header for 429 responses. + * + * Best practice: wrap only the HTTP connection (httpRequestStream) in withRetry, + * NOT the event processing loop. This ensures onDelta callbacks are never + * called twice for the same text on retry. + */ +export async function withRetry( + fn: () => Promise, + options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void; signal?: AbortSignal } = {}, +): Promise { + const maxRetries = options.maxRetries ?? 3; + let lastError: Error | undefined; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error as Error; + const httpError = error as HttpStreamError; + + // Don't retry on abort + if (httpError.isAbort || httpError.message === 'Request cancelled') { + throw error; + } + + // Check signal before retrying + if (options.signal?.aborted) { + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + throw abortError; + } + + // Don't retry on non-retryable status codes + if (httpError.statusCode && !RETRYABLE_STATUS_CODES.has(httpError.statusCode)) { + throw error; + } + + // Don't retry if we've exhausted retries + if (attempt >= maxRetries) { + throw error; + } + + // Calculate delay with exponential backoff and jitter + const baseDelay = Math.pow(2, attempt) * 1000; // 1s, 2s, 4s + const jitter = Math.random() * 500; + let delay = baseDelay + jitter; + + // Respect Retry-After header for 429 + if (httpError.retryAfter && httpError.retryAfter > 0) { + delay = Math.max(delay, httpError.retryAfter * 1000); + } + + if (options.onRetry) { + options.onRetry(attempt + 1, lastError); + } + + // Abort-aware delay: reject immediately if signal fires during wait + await new Promise((resolve, reject) => { + const timer = setTimeout(resolve, delay); + if (options.signal) { + const onAbort = () => { + clearTimeout(timer); + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + reject(abortError); + }; + if (options.signal.aborted) { + clearTimeout(timer); + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + reject(abortError); + return; + } + options.signal.addEventListener('abort', onAbort, { once: true }); + } + }); + } + } + + throw lastError; +} + +// ── HTTP Streaming Request ── + +interface HttpStreamOptions { + method?: string; + headers?: Record; + body?: string; + signal?: AbortSignal; + timeout?: number; +} + +/** + * Make an HTTP request that returns an async iterable of SSE events. + * + * Uses Node.js http/https modules directly, reading the response + * as a readable stream and parsing SSE events incrementally. + * + * On non-2xx status: collects the error body and throws. + * Supports AbortSignal for cancellation. + */ +export function httpRequestStream( + urlStr: string, + options: HttpStreamOptions, +): Promise<{ + statusCode: number; + events: AsyncIterable; +}> { + return new Promise((resolve, reject) => { + const url = new URL(urlStr); + const protocol = url.protocol === 'https:' ? https : http; + const timeout = options.timeout ?? 120000; + + const req = protocol.request(url, { + method: options.method || 'POST', + headers: options.headers || {}, + timeout, + }, (res) => { + const statusCode = res.statusCode || 0; + + // Non-2xx: collect error body and throw + if (statusCode < 200 || statusCode >= 300) { + let errorBody = ''; + res.on('data', (chunk: Buffer) => { errorBody += chunk; }); + res.on('end', () => { + const error: HttpStreamError = new Error(`API error: ${statusCode}`) as HttpStreamError; + error.statusCode = statusCode; + + // Parse Retry-After for 429 + if (statusCode === 429) { + const retryAfter = res.headers['retry-after']; + if (retryAfter) { + const seconds = parseInt(retryAfter, 10); + if (!isNaN(seconds)) { + error.retryAfter = seconds; + } + } + } + + // Try to extract a better error message + try { + const parsed = JSON.parse(errorBody); + error.message = parsed.error?.message || parsed.message || error.message; + } catch { + if (errorBody.length > 0) { + error.message = `${error.message}: ${errorBody.slice(0, 200)}`; + } + } + reject(error); + }); + return; + } + + // 2xx: create async iterable of SSE events + const events: AsyncIterable = { + [Symbol.asyncIterator]() { + let buffer = ''; + let done = false; + let pendingError: Error | null = null; + const eventQueue: SSEEvent[] = []; + let resolveNext: ((value: IteratorResult) => void) | null = null; + let rejectNext: ((error: Error) => void) | null = null; + + res.on('data', (chunk: Buffer) => { + buffer += chunk.toString('utf-8'); + const { events: parsed, remaining } = parseSSELines(buffer); + buffer = remaining; + + for (const event of parsed) { + if (resolveNext) { + const resolve = resolveNext; + resolveNext = null; + rejectNext = null; + resolve({ value: event, done: false }); + } else { + eventQueue.push(event); + } + } + }); + + res.on('end', () => { + done = true; + if (resolveNext) { + const resolve = resolveNext; + resolveNext = null; + rejectNext = null; + resolve({ value: undefined as unknown as SSEEvent, done: true }); + } + }); + + res.on('error', (err: Error) => { + done = true; + if (rejectNext) { + const reject = rejectNext; + resolveNext = null; + rejectNext = null; + reject(err); + } else { + // Store error for next .next() call so it's not silently swallowed + pendingError = err; + } + }); + + return { + next(): Promise> { + // Return queued event immediately + if (eventQueue.length > 0) { + return Promise.resolve({ value: eventQueue.shift()!, done: false }); + } + + // Throw stored error from a previous event that fired with no consumer waiting + if (pendingError) { + const err = pendingError; + pendingError = null; + return Promise.reject(err); + } + + // Stream already ended + if (done) { + return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true }); + } + + // Wait for next event + return new Promise>((resolve, reject) => { + resolveNext = resolve; + rejectNext = reject; + }); + }, + return(): Promise> { + // Called when for-await-of exits early (break, return, throw). + // Destroy the response stream to free the socket immediately. + done = true; + res.destroy(); + return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true }); + }, + }; + }, + }; + + resolve({ statusCode, events }); + }); + + req.on('error', (err: Error) => { + const error: HttpStreamError = err as HttpStreamError; + if (options.signal?.aborted) { + error.isAbort = true; + } + reject(error); + }); + + req.on('timeout', () => { + req.destroy(); + reject(new Error('Request timed out')); + }); + + if (options.signal) { + if (options.signal.aborted) { + req.destroy(); + const error: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + error.isAbort = true; + reject(error); + return; + } + options.signal.addEventListener('abort', () => { + req.destroy(); + }, { once: true }); + } + + if (options.body) { + req.write(options.body); + } + req.end(); + }); +} diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts new file mode 100644 index 0000000..77e5358 --- /dev/null +++ b/tests/engine/streaming.test.ts @@ -0,0 +1,1598 @@ +/** + * Tests for SSE streaming infrastructure (PR 1) + * + * Covers: + * - SSE line parsing (buffering partial lines across TCP chunks) + * - OpenAI/Mistral SSE event parsing (text deltas, tool calls, usage, [DONE]) + * - Anthropic SSE event parsing (message_start, content_block_delta, etc.) + * - Tool-call argument accumulation during streaming + * - Error handling (mid-stream errors, non-2xx status, abort) + * - Retry with exponential backoff (429/502/503, Retry-After, no retry on 4xx/abort) + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import http from 'http'; +import { + parseSSELines, + parseOpenAIStreamEvent, + parseAnthropicStreamEvent, + withRetry, + httpRequestStream, + type SSEEvent, + type OpenAIStreamAccumulator, + type AnthropicStreamAccumulator, + createOpenAIStreamAccumulator, + createAnthropicStreamAccumulator, +} from '../../src/main/engine/streaming'; + +// ── SSE Line Parsing ── + +describe('parseSSELines', () => { + it('parses a complete SSE event from a single chunk', () => { + const buffer = ''; + const chunk = 'data: {"id":"1","choices":[{"delta":{"content":"Hello"}}]}\n\n'; + const { events, remaining } = parseSSELines(buffer + chunk); + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ event: undefined, data: '{"id":"1","choices":[{"delta":{"content":"Hello"}}]}' }); + expect(remaining).toBe(''); + }); + + it('handles partial lines across TCP chunks', () => { + // First chunk ends mid-line + const chunk1 = 'data: {"id":"1","cho'; + const { events: events1, remaining: rem1 } = parseSSELines(chunk1); + expect(events1).toHaveLength(0); + expect(rem1).toBe('data: {"id":"1","cho'); + + // Second chunk completes the line + const chunk2 = 'ices":[{"delta":{"content":"Hello"}}]}\n\n'; + const { events: events2, remaining: rem2 } = parseSSELines(rem1 + chunk2); + expect(events2).toHaveLength(1); + expect(events2[0].data).toBe('{"id":"1","choices":[{"delta":{"content":"Hello"}}]}'); + expect(rem2).toBe(''); + }); + + it('handles multiple events in a single chunk', () => { + const chunk = 'data: {"a":1}\n\ndata: {"b":2}\n\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(2); + expect(events[0].data).toBe('{"a":1}'); + expect(events[1].data).toBe('{"b":2}'); + expect(remaining).toBe(''); + }); + + it('handles named event types (Anthropic format)', () => { + const chunk = 'event: message_start\ndata: {"type":"message_start"}\n\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(1); + expect(events[0].event).toBe('message_start'); + expect(events[0].data).toBe('{"type":"message_start"}'); + expect(remaining).toBe(''); + }); + + it('handles [DONE] sentinel', () => { + const chunk = 'data: [DONE]\n\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(1); + expect(events[0].data).toBe('[DONE]'); + expect(remaining).toBe(''); + }); + + it('ignores empty data lines (keep-alive pings)', () => { + const chunk = ':\n\ndata: {"a":1}\n\n'; + const { events, remaining } = parseSSELines(chunk); + // The comment line ':' should be ignored + expect(events).toHaveLength(1); + expect(events[0].data).toBe('{"a":1}'); + expect(remaining).toBe(''); + }); + + it('handles multiple data lines for a single event (concatenation per SSE spec)', () => { + const chunk = 'data: line1\ndata: line2\n\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(1); + expect(events[0].data).toBe('line1\nline2'); + expect(remaining).toBe(''); + }); + + it('returns incomplete data as remaining buffer', () => { + const chunk = 'data: {"partial'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(0); + expect(remaining).toBe('data: {"partial'); + }); + + it('handles \\r\\n line endings', () => { + const chunk = 'data: {"a":1}\r\n\r\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(1); + expect(events[0].data).toBe('{"a":1}'); + expect(remaining).toBe(''); + }); +}); + +// ── OpenAI/Mistral Stream Event Parsing ── + +describe('parseOpenAIStreamEvent', () => { + let accumulator: OpenAIStreamAccumulator; + + beforeEach(() => { + accumulator = createOpenAIStreamAccumulator(); + }); + + it('extracts text delta from content field', () => { + const event: SSEEvent = { + data: JSON.stringify({ + id: 'chatcmpl-1', + choices: [{ delta: { content: 'Hello' }, index: 0 }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.textDelta).toBe('Hello'); + expect(result.done).toBe(false); + }); + + it('accumulates tool call start (id + name)', () => { + const event: SSEEvent = { + data: JSON.stringify({ + id: 'chatcmpl-1', + choices: [{ + delta: { + tool_calls: [{ + index: 0, + id: 'call_abc', + function: { name: 'search_posts', arguments: '' }, + }], + }, + index: 0, + }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(accumulator.toolCalls.get(0)).toEqual({ + id: 'call_abc', + name: 'search_posts', + arguments: '', + }); + }); + + it('accumulates tool call argument fragments', () => { + // First event: tool call start + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, id: 'call_abc', + function: { name: 'search_posts', arguments: '' }, + }], + }, + index: 0, + }], + }), + }, accumulator); + + // Second event: argument fragment + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, + function: { arguments: '{"query"' }, + }], + }, + index: 0, + }], + }), + }, accumulator); + + // Third event: more arguments + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, + function: { arguments: ': "test"}' }, + }], + }, + index: 0, + }], + }), + }, accumulator); + + expect(accumulator.toolCalls.get(0)?.arguments).toBe('{"query": "test"}'); + }); + + it('handles multiple concurrent tool calls', () => { + // Tool call 0 + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [ + { index: 0, id: 'call_1', function: { name: 'search_posts', arguments: '{"q":"a"}' } }, + { index: 1, id: 'call_2', function: { name: 'list_posts', arguments: '{"limit":5}' } }, + ], + }, + index: 0, + }], + }), + }, accumulator); + + expect(accumulator.toolCalls.get(0)?.name).toBe('search_posts'); + expect(accumulator.toolCalls.get(1)?.name).toBe('list_posts'); + }); + + it('detects finish_reason stop', () => { + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, finish_reason: 'stop', index: 0 }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.finishReason).toBe('stop'); + }); + + it('detects finish_reason tool_calls', () => { + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, finish_reason: 'tool_calls', index: 0 }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.finishReason).toBe('tool_calls'); + }); + + it('extracts token usage from final chunk', () => { + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, index: 0 }], + usage: { + prompt_tokens: 150, + completion_tokens: 42, + total_tokens: 192, + }, + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.usage).toEqual({ + promptTokens: 150, + completionTokens: 42, + totalTokens: 192, + }); + }); + + it('handles [DONE] sentinel', () => { + const event: SSEEvent = { data: '[DONE]' }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.done).toBe(true); + }); + + it('returns empty result for empty content delta', () => { + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: { content: '' }, index: 0 }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + }); +}); + +// ── Anthropic Stream Event Parsing ── + +describe('parseAnthropicStreamEvent', () => { + let accumulator: AnthropicStreamAccumulator; + + beforeEach(() => { + accumulator = createAnthropicStreamAccumulator(); + }); + + it('extracts input_tokens from message_start', () => { + const event: SSEEvent = { + event: 'message_start', + data: JSON.stringify({ + type: 'message_start', + message: { + id: 'msg_1', + model: 'claude-sonnet-4-5', + usage: { + input_tokens: 150, + cache_read_input_tokens: 50, + cache_creation_input_tokens: 10, + }, + }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.usage).toEqual({ + inputTokens: 150, + cacheReadTokens: 50, + cacheWriteTokens: 10, + }); + }); + + it('handles text content_block_start (no-op)', () => { + const event: SSEEvent = { + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'text', text: '' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + }); + + it('handles tool_use content_block_start', () => { + const event: SSEEvent = { + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 1, + content_block: { type: 'tool_use', id: 'toolu_abc', name: 'search_posts' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(accumulator.toolCalls.get(1)).toEqual({ + id: 'toolu_abc', + name: 'search_posts', + arguments: '', + }); + }); + + it('extracts text_delta from content_block_delta', () => { + const event: SSEEvent = { + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'Hello world' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBe('Hello world'); + }); + + it('accumulates tool input_json_delta fragments', () => { + // Start tool block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 1, + content_block: { type: 'tool_use', id: 'toolu_abc', name: 'search_posts' }, + }), + }, accumulator); + + // First argument fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'input_json_delta', partial_json: '{"query"' }, + }), + }, accumulator); + + // Second argument fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'input_json_delta', partial_json: ': "test"}' }, + }), + }, accumulator); + + expect(accumulator.toolCalls.get(1)?.arguments).toBe('{"query": "test"}'); + }); + + it('extracts output_tokens from message_delta', () => { + const event: SSEEvent = { + event: 'message_delta', + data: JSON.stringify({ + type: 'message_delta', + delta: { stop_reason: 'end_turn' }, + usage: { output_tokens: 42 }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.usage).toEqual({ outputTokens: 42 }); + expect(result.finishReason).toBe('end_turn'); + }); + + it('signals done on message_stop', () => { + const event: SSEEvent = { + event: 'message_stop', + data: JSON.stringify({ type: 'message_stop' }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.done).toBe(true); + }); + + it('ignores ping events', () => { + const event: SSEEvent = { + event: 'ping', + data: JSON.stringify({ type: 'ping' }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(result.done).toBe(false); + }); + + it('throws on error events', () => { + const event: SSEEvent = { + event: 'error', + data: JSON.stringify({ + type: 'error', + error: { type: 'overloaded_error', message: 'Server is overloaded' }, + }), + }; + expect(() => parseAnthropicStreamEvent(event, accumulator)).toThrow('Server is overloaded'); + }); + + it('signals tool_use finish reason from message_delta', () => { + const event: SSEEvent = { + event: 'message_delta', + data: JSON.stringify({ + type: 'message_delta', + delta: { stop_reason: 'tool_use' }, + usage: { output_tokens: 10 }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.finishReason).toBe('tool_use'); + }); + + it('handles thinking content_block_start', () => { + const event: SSEEvent = { + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(accumulator.thinkingBlocks.get(0)).toEqual({ text: '' }); + }); + + it('accumulates thinking_delta fragments', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // First thinking fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Let me think' }, + }), + }, accumulator); + + // Second thinking fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: ' about this...' }, + }), + }, accumulator); + + expect(accumulator.thinkingBlocks.get(0)?.text).toBe('Let me think about this...'); + }); + + it('does not emit thinking_delta as textDelta', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + const result = parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Internal reasoning' }, + }), + }, accumulator); + + // thinking_delta must NOT leak to textDelta — it's internal model reasoning + expect(result.textDelta).toBeUndefined(); + }); + + it('accumulates thinking and text blocks independently', () => { + // Thinking block at index 0 + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Reasoning...' }, + }), + }, accumulator); + + // Text block at index 1 + const textResult = parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'text_delta', text: 'Here is my answer' }, + }), + }, accumulator); + + // Thinking accumulated separately + expect(accumulator.thinkingBlocks.get(0)?.text).toBe('Reasoning...'); + // Text still emitted as textDelta + expect(textResult.textDelta).toBe('Here is my answer'); + }); +}); + +// ── Tool Call Accumulation ── + +describe('tool call accumulation', () => { + it('OpenAI: builds complete tool calls from fragments', () => { + const acc = createOpenAIStreamAccumulator(); + + // Start + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, id: 'call_1', + function: { name: 'search_posts', arguments: '' }, + }], + }, + index: 0, + }], + }), + }, acc); + + // Fragments + for (const frag of ['{"', 'query', '": "', 'hello', '"}']) { + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { tool_calls: [{ index: 0, function: { arguments: frag } }] }, + index: 0, + }], + }), + }, acc); + } + + const tc = acc.toolCalls.get(0)!; + expect(tc.id).toBe('call_1'); + expect(tc.name).toBe('search_posts'); + expect(JSON.parse(tc.arguments)).toEqual({ query: 'hello' }); + }); + + it('Anthropic: builds complete tool calls from fragments', () => { + const acc = createAnthropicStreamAccumulator(); + + // Start block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 1, + content_block: { type: 'tool_use', id: 'toolu_1', name: 'list_posts' }, + }), + }, acc); + + // Fragments + for (const frag of ['{"', 'limit', '": ', '5}']) { + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'input_json_delta', partial_json: frag }, + }), + }, acc); + } + + const tc = acc.toolCalls.get(1)!; + expect(tc.id).toBe('toolu_1'); + expect(tc.name).toBe('list_posts'); + expect(JSON.parse(tc.arguments)).toEqual({ limit: 5 }); + }); +}); + +// ── Retry with Exponential Backoff ── + +describe('withRetry', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + it('returns result on first successful call', async () => { + const fn = vi.fn().mockResolvedValue('success'); + const promise = withRetry(fn, { maxRetries: 3 }); + const result = await promise; + expect(result).toBe('success'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('retries on 429 status and succeeds', async () => { + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('success'); + + const promise = withRetry(fn, { maxRetries: 3 }); + // Advance past the retry delay + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('success'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + it('retries on 502 status', async () => { + const error502 = Object.assign(new Error('Bad Gateway'), { statusCode: 502 }); + const fn = vi.fn() + .mockRejectedValueOnce(error502) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + it('retries on 503 status', async () => { + const error503 = Object.assign(new Error('Service Unavailable'), { statusCode: 503 }); + const fn = vi.fn() + .mockRejectedValueOnce(error503) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + it('does NOT retry on 400 status', async () => { + const error400 = Object.assign(new Error('Bad Request'), { statusCode: 400 }); + const fn = vi.fn().mockRejectedValue(error400); + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Bad Request'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does NOT retry on 401 status', async () => { + const error401 = Object.assign(new Error('Unauthorized'), { statusCode: 401 }); + const fn = vi.fn().mockRejectedValue(error401); + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Unauthorized'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does NOT retry on 403 status', async () => { + const error403 = Object.assign(new Error('Forbidden'), { statusCode: 403 }); + const fn = vi.fn().mockRejectedValue(error403); + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Forbidden'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does NOT retry on abort', async () => { + const abortError = Object.assign(new Error('Request cancelled'), { isAbort: true }); + const fn = vi.fn().mockRejectedValue(abortError); + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Request cancelled'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('exhausts max retries and throws last error', async () => { + vi.useRealTimers(); // Real timers work better for this test + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + let callCount = 0; + const fn = vi.fn().mockImplementation(() => { + callCount++; + return Promise.reject(error429); + }); + + await expect(withRetry(fn, { maxRetries: 2 })).rejects.toThrow('Rate limited'); + expect(fn).toHaveBeenCalledTimes(3); // 1 initial + 2 retries + vi.useFakeTimers(); // Restore for afterEach + }); + + it('respects Retry-After header for 429', async () => { + const error429 = Object.assign(new Error('Rate limited'), { + statusCode: 429, + retryAfter: 5, + }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + // Should NOT have retried yet at 3 seconds (Retry-After is 5) + await vi.advanceTimersByTimeAsync(3000); + expect(fn).toHaveBeenCalledTimes(1); + // Advance past the Retry-After + await vi.advanceTimersByTimeAsync(3000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + afterEach(() => { + vi.useRealTimers(); + }); +}); + +// ── Full stream-to-result integration ── + +describe('stream event sequences', () => { + it('OpenAI: processes a complete text response stream', () => { + const acc = createOpenAIStreamAccumulator(); + const textChunks: string[] = []; + + const events: SSEEvent[] = [ + { data: JSON.stringify({ choices: [{ delta: { role: 'assistant' }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { content: 'Hello' }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { content: ' world' }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { content: '!' }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: {}, finish_reason: 'stop', index: 0 }], usage: { prompt_tokens: 10, completion_tokens: 3, total_tokens: 13 } }) }, + { data: '[DONE]' }, + ]; + + for (const event of events) { + const result = parseOpenAIStreamEvent(event, acc); + if (result.textDelta) textChunks.push(result.textDelta); + } + + expect(textChunks.join('')).toBe('Hello world!'); + }); + + it('Anthropic: processes a complete text response stream', () => { + const acc = createAnthropicStreamAccumulator(); + const textChunks: string[] = []; + + const events: SSEEvent[] = [ + { event: 'message_start', data: JSON.stringify({ type: 'message_start', message: { id: 'msg_1', model: 'claude-sonnet-4', usage: { input_tokens: 100 } } }) }, + { event: 'content_block_start', data: JSON.stringify({ type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } }) }, + { event: 'content_block_delta', data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Hello' } }) }, + { event: 'content_block_delta', data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: ' world!' } }) }, + { event: 'content_block_stop', data: JSON.stringify({ type: 'content_block_stop', index: 0 }) }, + { event: 'message_delta', data: JSON.stringify({ type: 'message_delta', delta: { stop_reason: 'end_turn' }, usage: { output_tokens: 5 } }) }, + { event: 'message_stop', data: JSON.stringify({ type: 'message_stop' }) }, + ]; + + for (const event of events) { + const result = parseAnthropicStreamEvent(event, acc); + if (result.textDelta) textChunks.push(result.textDelta); + } + + expect(textChunks.join('')).toBe('Hello world!'); + }); + + it('OpenAI: processes a tool call response stream', () => { + const acc = createOpenAIStreamAccumulator(); + + const events: SSEEvent[] = [ + { data: JSON.stringify({ choices: [{ delta: { role: 'assistant', tool_calls: [{ index: 0, id: 'call_1', function: { name: 'search_posts', arguments: '' } }] }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { tool_calls: [{ index: 0, function: { arguments: '{"query"' } }] }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { tool_calls: [{ index: 0, function: { arguments: ': "test"}' } }] }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: {}, finish_reason: 'tool_calls', index: 0 }] }) }, + { data: '[DONE]' }, + ]; + + for (const event of events) { + parseOpenAIStreamEvent(event, acc); + } + + expect(acc.toolCalls.size).toBe(1); + const tc = acc.toolCalls.get(0)!; + expect(tc.name).toBe('search_posts'); + expect(JSON.parse(tc.arguments)).toEqual({ query: 'test' }); + }); + + it('Anthropic: processes a tool call response stream', () => { + const acc = createAnthropicStreamAccumulator(); + + const events: SSEEvent[] = [ + { event: 'message_start', data: JSON.stringify({ type: 'message_start', message: { id: 'msg_1', usage: { input_tokens: 100 } } }) }, + { event: 'content_block_start', data: JSON.stringify({ type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } }) }, + { event: 'content_block_delta', data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Let me search.' } }) }, + { event: 'content_block_stop', data: JSON.stringify({ type: 'content_block_stop', index: 0 }) }, + { event: 'content_block_start', data: JSON.stringify({ type: 'content_block_start', index: 1, content_block: { type: 'tool_use', id: 'toolu_1', name: 'search_posts' } }) }, + { event: 'content_block_delta', data: JSON.stringify({ type: 'content_block_delta', index: 1, delta: { type: 'input_json_delta', partial_json: '{"query": "test"}' } }) }, + { event: 'content_block_stop', data: JSON.stringify({ type: 'content_block_stop', index: 1 }) }, + { event: 'message_delta', data: JSON.stringify({ type: 'message_delta', delta: { stop_reason: 'tool_use' }, usage: { output_tokens: 20 } }) }, + { event: 'message_stop', data: JSON.stringify({ type: 'message_stop' }) }, + ]; + + const textChunks: string[] = []; + for (const event of events) { + const result = parseAnthropicStreamEvent(event, acc); + if (result.textDelta) textChunks.push(result.textDelta); + } + + expect(textChunks.join('')).toBe('Let me search.'); + expect(acc.toolCalls.size).toBe(1); + const tc = acc.toolCalls.get(1)!; + expect(tc.name).toBe('search_posts'); + expect(JSON.parse(tc.arguments)).toEqual({ query: 'test' }); + }); +}); + +// ── JSON Parse Error Resilience ── + +describe('parser JSON error resilience', () => { + it('OpenAI: skips corrupted SSE events with invalid JSON', () => { + const acc = createOpenAIStreamAccumulator(); + const event: SSEEvent = { data: '{corrupted json' }; + const result = parseOpenAIStreamEvent(event, acc); + expect(result.done).toBe(false); + expect(result.textDelta).toBeUndefined(); + }); + + it('OpenAI: recovers from corrupted event and processes subsequent valid events', () => { + const acc = createOpenAIStreamAccumulator(); + + // Corrupted event + parseOpenAIStreamEvent({ data: 'not-json' }, acc); + + // Valid event after corruption + const result = parseOpenAIStreamEvent({ + data: JSON.stringify({ choices: [{ delta: { content: 'OK' }, index: 0 }] }), + }, acc); + expect(result.textDelta).toBe('OK'); + }); + + it('Anthropic: skips corrupted SSE events with invalid JSON', () => { + const acc = createAnthropicStreamAccumulator(); + const event: SSEEvent = { event: 'content_block_delta', data: '{broken' }; + const result = parseAnthropicStreamEvent(event, acc); + expect(result.done).toBe(false); + expect(result.textDelta).toBeUndefined(); + }); + + it('Anthropic: recovers from corrupted event and processes subsequent valid events', () => { + const acc = createAnthropicStreamAccumulator(); + + // Corrupted event + parseAnthropicStreamEvent({ event: 'ping', data: 'not-json' }, acc); + + // Valid event after corruption + const result = parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'Recovered' }, + }), + }, acc); + expect(result.textDelta).toBe('Recovered'); + }); +}); + +// ── OpenAI Cache Token Extraction ── + +describe('OpenAI cache token extraction', () => { + it('extracts cached_tokens from prompt_tokens_details', () => { + const acc = createOpenAIStreamAccumulator(); + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, index: 0 }], + usage: { + prompt_tokens: 150, + completion_tokens: 42, + total_tokens: 192, + prompt_tokens_details: { cached_tokens: 100 }, + }, + }), + }; + const result = parseOpenAIStreamEvent(event, acc); + expect(result.usage).toEqual({ + promptTokens: 150, + completionTokens: 42, + totalTokens: 192, + cacheReadTokens: 100, + }); + }); + + it('returns undefined cacheReadTokens when prompt_tokens_details is absent', () => { + const acc = createOpenAIStreamAccumulator(); + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, index: 0 }], + usage: { + prompt_tokens: 150, + completion_tokens: 42, + total_tokens: 192, + }, + }), + }; + const result = parseOpenAIStreamEvent(event, acc); + expect(result.usage?.cacheReadTokens).toBeUndefined(); + }); +}); + +// ── httpRequestStream ── + +describe('httpRequestStream', () => { + // Use a real HTTP server for integration tests (avoids ESM spyOn limitations) + + function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise }> { + return new Promise((resolve) => { + const server = http.createServer(handler); + server.listen(0, () => { + const addr = server.address() as { port: number }; + resolve({ + url: `http://localhost:${addr.port}`, + close: () => new Promise((r) => server.close(() => r())), + }); + }); + }); + } + + it('parses streamed SSE events from response data chunks', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":" world"}}]}\n\n'); + res.write('data: [DONE]\n\n'); + res.end(); + }); + + try { + const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' }); + const collected: SSEEvent[] = []; + for await (const event of events) { + collected.push(event); + } + expect(collected).toHaveLength(3); + expect(collected[0].data).toBe('{"choices":[{"delta":{"content":"Hello"}}]}'); + expect(collected[1].data).toBe('{"choices":[{"delta":{"content":" world"}}]}'); + expect(collected[2].data).toBe('[DONE]'); + } finally { + await srv.close(); + } + }); + + it('collects error body and rejects on non-2xx status', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(429, { 'Content-Type': 'application/json', 'Retry-After': '5' }); + res.end(JSON.stringify({ error: { message: 'Rate limited' } })); + }); + + try { + await expect(httpRequestStream(srv.url, {})).rejects.toMatchObject({ + message: 'Rate limited', + statusCode: 429, + retryAfter: 5, + }); + } finally { + await srv.close(); + } + }); + + it('propagates mid-stream errors to async iterable consumer', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hi"}}]}\n\n'); + // Destroy the socket to simulate TCP disconnect + setTimeout(() => res.destroy(), 20); + }); + + try { + const { events } = await httpRequestStream(srv.url, {}); + const collected: SSEEvent[] = []; + await expect(async () => { + for await (const event of events) { + collected.push(event); + } + }).rejects.toThrow(); + + // Should have received the first event before the error + expect(collected).toHaveLength(1); + expect(collected[0].data).toBe('{"choices":[{"delta":{"content":"Hi"}}]}'); + } finally { + await srv.close(); + } + }); + + it('propagates stored error when no consumer was waiting (pendingError fix)', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + // Send data and immediately destroy — error fires before consumer calls .next() + res.write('data: {"ok":true}\n\n'); + // Give a tiny delay so the data event fires first + setTimeout(() => res.destroy(), 5); + }); + + try { + const { events } = await httpRequestStream(srv.url, {}); + const iter = events[Symbol.asyncIterator](); + + // Wait a bit for both data and error to fire + await new Promise(resolve => setTimeout(resolve, 50)); + + // First call should return the queued event + const first = await iter.next(); + expect(first.done).toBe(false); + expect(first.value.data).toBe('{"ok":true}'); + + // Second call should throw the stored (pending) error + await expect(iter.next()).rejects.toThrow(); + } finally { + await srv.close(); + } + }); + + it('handles already-aborted signal', async () => { + // No server needed — should reject immediately + const controller = new AbortController(); + controller.abort(); + + await expect(httpRequestStream('http://localhost:1/test', { + signal: controller.signal, + })).rejects.toMatchObject({ + isAbort: true, + }); + }); + + it('handles non-JSON error body', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(500, { 'Content-Type': 'text/plain' }); + res.end('Internal Server Error'); + }); + + try { + await expect(httpRequestStream(srv.url, {})).rejects.toMatchObject({ + statusCode: 500, + }); + } finally { + await srv.close(); + } + }); +}); + +// ── withRetry onRetry callback ── + +describe('withRetry onRetry callback', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('calls onRetry callback before each retry attempt', async () => { + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const onRetry = vi.fn(); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockRejectedValueOnce(error429) + .mockResolvedValue('success'); + + const promise = withRetry(fn, { maxRetries: 3, onRetry }); + await vi.advanceTimersByTimeAsync(10000); + const result = await promise; + + expect(result).toBe('success'); + expect(onRetry).toHaveBeenCalledTimes(2); + expect(onRetry).toHaveBeenCalledWith(1, error429); + expect(onRetry).toHaveBeenCalledWith(2, error429); + }); + + it('does not call onRetry when first attempt succeeds', async () => { + const onRetry = vi.fn(); + const fn = vi.fn().mockResolvedValue('ok'); + + const result = await withRetry(fn, { maxRetries: 3, onRetry }); + + expect(result).toBe('ok'); + expect(onRetry).not.toHaveBeenCalled(); + }); +}); + +// ── Mid-stream retry integration ── + +describe('mid-stream retry with withRetry', () => { + it('retries stream consumption on transient mid-stream error', async () => { + vi.useRealTimers(); + + let attempt = 0; + const fn = async () => { + attempt++; + if (attempt === 1) { + // First attempt: simulate partial stream then error + const error = Object.assign(new Error('Service temporarily unavailable'), { statusCode: 503 }); + throw error; + } + // Second attempt: succeed + return { text: 'Hello world!', toolCalls: [] }; + }; + + const result = await withRetry(fn, { maxRetries: 2 }); + expect(result).toEqual({ text: 'Hello world!', toolCalls: [] }); + expect(attempt).toBe(2); + }); + + it('retries on mid-stream TCP error (no status code)', async () => { + vi.useRealTimers(); + + let attempt = 0; + const fn = async () => { + attempt++; + if (attempt === 1) { + throw new Error('ECONNRESET'); + } + return 'recovered'; + }; + + const result = await withRetry(fn, { maxRetries: 2 }); + expect(result).toBe('recovered'); + expect(attempt).toBe(2); + }); + + it('does not retry mid-stream abort errors', async () => { + const abortError = Object.assign(new Error('Request cancelled'), { isAbort: true }); + + let attempt = 0; + const fn = async () => { + attempt++; + throw abortError; + }; + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Request cancelled'); + expect(attempt).toBe(1); + }); +}); + +// ── Connection-only retry (no double-emission) ── + +describe('connection-only retry pattern (withRetry wrapping httpRequestStream)', () => { + function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise }> { + return new Promise((resolve) => { + const server = http.createServer(handler); + server.listen(0, () => { + const addr = server.address() as { port: number }; + resolve({ + url: `http://localhost:${addr.port}`, + close: () => new Promise((r) => server.close(() => r())), + }); + }); + }); + } + + it('retries 429 at connection time without emitting duplicate deltas', async () => { + let requestCount = 0; + const srv = await startTestServer((_req, res) => { + requestCount++; + if (requestCount === 1) { + res.writeHead(429, { 'Content-Type': 'application/json', 'Retry-After': '0' }); + res.end(JSON.stringify({ error: { message: 'Rate limited' } })); + return; + } + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":" world"}}]}\n\n'); + res.write('data: [DONE]\n\n'); + res.end(); + }); + + try { + const deltas: string[] = []; + + // Retry only the connection, process events outside retry + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createOpenAIStreamAccumulator(); + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + + // Each delta appears exactly once — no double-emission + expect(deltas).toEqual(['Hello', ' world']); + expect(requestCount).toBe(2); // 1 failed + 1 success + } finally { + await srv.close(); + } + }); + + it('mid-stream TCP error propagates without retry when only connection is wrapped', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hi"}}]}\n\n'); + // Destroy socket to simulate mid-stream TCP disconnect + setTimeout(() => res.destroy(), 20); + }); + + try { + const deltas: string[] = []; + + // Only connection is retried — mid-stream errors propagate + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createOpenAIStreamAccumulator(); + await expect(async () => { + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + }).rejects.toThrow(); + + // Partial delta was received before the error — no duplication + expect(deltas).toEqual(['Hi']); + } finally { + await srv.close(); + } + }); + + it('retries 502 at connection time then streams successfully', async () => { + let requestCount = 0; + const srv = await startTestServer((_req, res) => { + requestCount++; + if (requestCount === 1) { + res.writeHead(502); + res.end('Bad Gateway'); + return; + } + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('event: message_start\ndata: {"type":"message_start","message":{"id":"msg_1","usage":{"input_tokens":10}}}\n\n'); + res.write('event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"OK"}}\n\n'); + res.write('event: message_stop\ndata: {"type":"message_stop"}\n\n'); + res.end(); + }); + + try { + const deltas: string[] = []; + + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createAnthropicStreamAccumulator(); + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + + expect(deltas).toEqual(['OK']); + expect(requestCount).toBe(2); + } finally { + await srv.close(); + } + }); +}); + +// ── SSE spec compliance ── + +describe('SSE spec compliance - single space removal', () => { + it('removes exactly one leading space after colon in data field', () => { + const chunk = 'data: {"key": "value"}\n\n'; + const { events } = parseSSELines(chunk); + expect(events[0].data).toBe('{"key": "value"}'); + }); + + it('preserves data when no space after colon', () => { + const chunk = 'data:{"key":"value"}\n\n'; + const { events } = parseSSELines(chunk); + expect(events[0].data).toBe('{"key":"value"}'); + }); + + it('preserves extra leading spaces after removing one', () => { + const chunk = 'data: two spaces\n\n'; + const { events } = parseSSELines(chunk); + // Per SSE spec: only one leading space is removed + expect(events[0].data).toBe(' two spaces'); + }); + + it('removes exactly one leading space from event type', () => { + const chunk = 'event: message_start\ndata: {}\n\n'; + const { events } = parseSSELines(chunk); + expect(events[0].event).toBe('message_start'); + }); + + it('handles event type with no space after colon', () => { + const chunk = 'event:ping\ndata: {}\n\n'; + const { events } = parseSSELines(chunk); + expect(events[0].event).toBe('ping'); + }); +}); + +// ── Async iterator return() cleanup ── + +describe('async iterator return() cleanup', () => { + function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise }> { + return new Promise((resolve) => { + const server = http.createServer(handler); + server.listen(0, () => { + const addr = server.address() as { port: number }; + resolve({ + url: `http://localhost:${addr.port}`, + close: () => new Promise((r) => server.close(() => r())), + }); + }); + }); + } + + it('destroys response stream when for-await-of breaks early', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"A"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":"B"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":"C"}}]}\n\n'); + // Don't end the response — the client should destroy it via return() + // Keep connection alive for a bit + setTimeout(() => res.end(), 5000); + }); + + try { + const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' }); + const collected: SSEEvent[] = []; + for await (const event of events) { + collected.push(event); + if (collected.length === 1) break; // Early exit triggers return() + } + + expect(collected).toHaveLength(1); + expect(collected[0].data).toBe('{"choices":[{"delta":{"content":"A"}}]}'); + } finally { + await srv.close(); + } + }); + + it('return() method signals done and is idempotent', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"ok":true}\n\n'); + setTimeout(() => res.end(), 5000); + }); + + try { + const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' }); + const iter = events[Symbol.asyncIterator](); + + // Consume one event + const first = await iter.next(); + expect(first.done).toBe(false); + + // Call return() explicitly + const returnResult = await iter.return!(undefined as unknown as SSEEvent); + expect(returnResult.done).toBe(true); + + // Subsequent next() should return done + const after = await iter.next(); + expect(after.done).toBe(true); + } finally { + await srv.close(); + } + }); +}); + +// ── Thinking block signature capture ── + +describe('Anthropic thinking block signature', () => { + let accumulator: AnthropicStreamAccumulator; + + beforeEach(() => { + accumulator = createAnthropicStreamAccumulator(); + }); + + it('captures signature from content_block_stop for thinking blocks', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // Accumulate thinking + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Let me reason...' }, + }), + }, accumulator); + + // content_block_stop with signature + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + content_block: { + type: 'thinking', + thinking: 'Let me reason...', + signature: 'ErUBCkYIAxgCIkD+ybfICm10kSig...', + }, + }), + }, accumulator); + + const tb = accumulator.thinkingBlocks.get(0); + expect(tb).toBeDefined(); + expect(tb!.text).toBe('Let me reason...'); + expect(tb!.signature).toBe('ErUBCkYIAxgCIkD+ybfICm10kSig...'); + }); + + it('leaves signature undefined when content_block_stop has no signature', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // content_block_stop without signature + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + }), + }, accumulator); + + const tb = accumulator.thinkingBlocks.get(0); + expect(tb).toBeDefined(); + expect(tb!.signature).toBeUndefined(); + }); + + it('does not affect tool_call blocks on content_block_stop', () => { + // Start tool_use block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'tool_use', id: 'toolu_1', name: 'search_posts' }, + }), + }, accumulator); + + // Tool argument fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'input_json_delta', partial_json: '{"query":"test"}' }, + }), + }, accumulator); + + // content_block_stop (no signature for tool blocks) + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + }), + }, accumulator); + + // Tool call should be unaffected + const tc = accumulator.toolCalls.get(0); + expect(tc).toBeDefined(); + expect(tc!.arguments).toBe('{"query":"test"}'); + }); + + it('full thinking sequence produces signature on accumulator', () => { + // Full realistic sequence: thinking block -> text block -> tool_use + // Thinking at index 0 + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ type: 'content_block_start', index: 0, content_block: { type: 'thinking', thinking: '' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'Step 1. ' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'Step 2.' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ type: 'content_block_stop', index: 0, content_block: { type: 'thinking', thinking: 'Step 1. Step 2.', signature: 'sig_abc123' } }), + }, accumulator); + + expect(accumulator.thinkingBlocks.get(0)).toEqual({ text: 'Step 1. Step 2.', signature: 'sig_abc123' }); + }); +}); + +// ── withRetry abort-aware delay ── + +describe('withRetry abort during delay', () => { + it('rejects quickly when signal is aborted during retry delay', async () => { + const controller = new AbortController(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn().mockRejectedValue(error429); + + const promise = withRetry(fn, { + maxRetries: 3, + signal: controller.signal, + }); + + // First attempt fails immediately, then enters retry delay. + // Wait a small amount for the first attempt to fail and delay to start. + await new Promise(r => setTimeout(r, 50)); + expect(fn).toHaveBeenCalledTimes(1); + + // Abort during the delay + controller.abort(); + + // Should reject with abort error, not wait for delay to finish + await expect(promise).rejects.toThrow(); + // Should NOT have made a second attempt — aborted during delay + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does not abort delay when no signal is provided', async () => { + vi.useFakeTimers(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + vi.useRealTimers(); + }); + + it('works normally when signal is not aborted', async () => { + vi.useFakeTimers(); + const controller = new AbortController(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { + maxRetries: 3, + signal: controller.signal, + }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + vi.useRealTimers(); + }); +});