From 78c2cb7bb771a3aabc67e560943a40bb6f172566 Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 10:10:54 +0100 Subject: [PATCH 01/10] feat: SSE streaming for chat providers --- src/main/engine/OpenCodeManager.ts | 249 ++++++---- src/main/engine/streaming.ts | 529 ++++++++++++++++++++ tests/engine/streaming.test.ts | 743 +++++++++++++++++++++++++++++ 3 files changed, 1417 insertions(+), 104 deletions(-) create mode 100644 src/main/engine/streaming.ts create mode 100644 tests/engine/streaming.test.ts diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 80c83ba..c433ec1 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -12,6 +12,16 @@ import https from 'https'; import http from 'http'; import { URL } from 'url'; import { BrowserWindow } from 'electron'; +import { + parseSSELines, + parseAnthropicStreamEvent, + parseOpenAIStreamEvent, + createAnthropicStreamAccumulator, + createOpenAIStreamAccumulator, + httpRequestStream, + withRetry, + type HttpStreamError, +} from './streaming'; import { ChatEngine } from './ChatEngine'; import { PostEngine, type PostData } from './PostEngine'; import { MediaEngine, type MediaData } from './MediaEngine'; @@ -470,10 +480,20 @@ export class OpenCodeManager { system: systemPrompt, messages, tools, + stream: true, cache_control: { type: 'ephemeral' }, }; - const response = await this.httpRequest(ZEN_ANTHROPIC_URL, { + // Stream the response with retry for transient errors + const streamAccumulator = createAnthropicStreamAccumulator(); + let stopReason = ''; + let inputTokens = 0; + let outputTokens = 0; + let cacheReadTokens = 0; + let cacheWriteTokens = 0; + let roundText = ''; // Text produced in this round only + + const { events } = await withRetry(() => httpRequestStream(ZEN_ANTHROPIC_URL, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -483,29 +503,43 @@ export class OpenCodeManager { }, body: JSON.stringify(body), signal, - }); + })); - if (response.statusCode >= 400) { - const errorMsg = this.parseErrorResponse(response); - throw new Error(errorMsg); + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, streamAccumulator); + + // Emit text deltas immediately for real-time streaming + if (result.textDelta) { + accumulatedText += result.textDelta; + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } + } + + // Collect usage from message_start (input tokens) and message_delta (output tokens) + if (result.usage) { + if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; + if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; + } + + if (result.finishReason) { + stopReason = result.finishReason; + } } - const data = JSON.parse(response.body); - - // Extract and emit token usage - if (data.usage && callbacks.onTokenUsage) { - const usage = data.usage; - const cacheReadTokens = usage.cache_read_input_tokens || 0; - const cacheWriteTokens = usage.cache_creation_input_tokens || 0; - const inputTokens = (usage.input_tokens || 0) - cacheReadTokens - cacheWriteTokens; - const outputTokens = usage.output_tokens || 0; - const totalTokens = (usage.input_tokens || 0) + outputTokens; + // Emit token usage after stream completes + if (callbacks.onTokenUsage) { + const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens; + const totalTokens = inputTokens + outputTokens; const prev = this.conversationUsage.get(conversationId) || { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0, }; const cumulative = { - inputTokens: prev.inputTokens + inputTokens, + inputTokens: prev.inputTokens + adjustedInputTokens, outputTokens: prev.outputTokens + outputTokens, cacheReadTokens: prev.cacheReadTokens + cacheReadTokens, cacheWriteTokens: prev.cacheWriteTokens + cacheWriteTokens, @@ -513,7 +547,7 @@ export class OpenCodeManager { this.conversationUsage.set(conversationId, cumulative); callbacks.onTokenUsage({ - inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens, totalTokens, + inputTokens: adjustedInputTokens, outputTokens, cacheReadTokens, cacheWriteTokens, totalTokens, cumulativeInputTokens: cumulative.inputTokens, cumulativeOutputTokens: cumulative.outputTokens, cumulativeCacheReadTokens: cumulative.cacheReadTokens, @@ -522,35 +556,19 @@ export class OpenCodeManager { }); } - console.log('[OpenCodeManager] Round', round, 'stop_reason:', data.stop_reason, 'content blocks:', JSON.stringify(data.content?.map((b: AnthropicContentBlock) => ({ type: b.type, textLen: b.text?.length, name: b.name })))); - - if (!data.content) { - throw new Error('API response missing content field'); - } - - // Check if there are tool_use blocks - const toolUseBlocks = (data.content as AnthropicContentBlock[]).filter( - (b: AnthropicContentBlock) => b.type === 'tool_use' - ); - - // Capture text from any block type that has a text field (text, thinking, etc.) - const textBlocks = (data.content as AnthropicContentBlock[]).filter( - (b: AnthropicContentBlock) => b.text - ); - - // Accumulate and stream text content to frontend - for (const block of textBlocks) { - if (block.text) { - accumulatedText += block.text; - if (callbacks.onDelta) { - callbacks.onDelta(block.text); - } + // Collect tool calls from stream accumulator + const toolUseBlocks: Array<{ id: string; name: string; input: unknown }> = []; + for (const [, tc] of streamAccumulator.toolCalls) { + try { + toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) }); + } catch { + toolUseBlocks.push({ id: tc.id, name: tc.name, input: {} }); } } - console.log('[OpenCodeManager] Round', round, 'accumulatedText length:', accumulatedText.length, 'toolUseBlocks:', toolUseBlocks.length); + console.log('[OpenCodeManager] Round', round, 'stopReason:', stopReason, 'accumulatedText length:', accumulatedText.length, 'toolCalls:', toolUseBlocks.length); - if (toolUseBlocks.length === 0 || data.stop_reason !== 'tool_use') { + if (toolUseBlocks.length === 0 || stopReason !== 'tool_use') { // No more tool calls - return all accumulated text console.log('[OpenCodeManager] Returning accumulated text length:', accumulatedText.length); return { content: accumulatedText, toolCalls: allToolCalls }; @@ -558,11 +576,26 @@ export class OpenCodeManager { // Execute tool calls const toolResults: AnthropicContentBlock[] = []; + // Build assistant content blocks for the next message round + const assistantContentBlocks: AnthropicContentBlock[] = []; + + // Add text block with text from this round + if (roundText) { + assistantContentBlocks.push({ type: 'text', text: roundText }); + } for (const toolBlock of toolUseBlocks) { - const toolName = toolBlock.name!; + const toolName = toolBlock.name; const toolArgs = toolBlock.input; - const toolUseId = toolBlock.id!; + const toolUseId = toolBlock.id; + + // Add tool_use block to assistant content + assistantContentBlocks.push({ + type: 'tool_use', + id: toolUseId, + name: toolName, + input: toolArgs, + }); allToolCalls.push({ name: toolName, args: toolArgs }); @@ -643,7 +676,7 @@ export class OpenCodeManager { // Add assistant response and tool results to messages for next round messages = [ ...messages, - { role: 'assistant' as const, content: data.content }, + { role: 'assistant' as const, content: assistantContentBlocks }, { role: 'user' as const, content: toolResults }, ]; } @@ -718,9 +751,18 @@ export class OpenCodeManager { max_tokens: 4096, messages, tools: openaiTools, + stream: true, + stream_options: { include_usage: true }, }; - const response = await this.httpRequest(ZEN_OPENAI_URL, { + // Stream the response with retry for transient errors + const streamAccumulator = createOpenAIStreamAccumulator(); + let finishReason = ''; + let promptTokens = 0; + let completionTokens = 0; + let totalTokens = 0; + + const { events } = await withRetry(() => httpRequestStream(ZEN_OPENAI_URL, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -728,23 +770,38 @@ export class OpenCodeManager { }, body: JSON.stringify(body), signal, - }); + })); - if (response.statusCode >= 400) { - const errorMsg = this.parseErrorResponse(response); - throw new Error(errorMsg); + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, streamAccumulator); + + // Emit text deltas immediately for real-time streaming + if (result.textDelta) { + accumulatedText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } + } + + // Collect usage from final chunk + if (result.usage) { + if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; + if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; + if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; + } + + if (result.finishReason) { + finishReason = result.finishReason; + } + + if (result.done) break; } - const data = JSON.parse(response.body); - const choice = data.choices?.[0]; - - // Extract and emit token usage (OpenAI format) - if (data.usage && callbacks.onTokenUsage) { - const usage = data.usage; - const cacheReadTokens = usage.prompt_tokens_details?.cached_tokens || 0; - const inputTokens = (usage.prompt_tokens || 0) - cacheReadTokens; - const outputTokens = usage.completion_tokens || 0; - const totalTokens = usage.total_tokens || (usage.prompt_tokens || 0) + outputTokens; + // Emit token usage after stream completes + if (callbacks.onTokenUsage) { + const cacheReadTokens = 0; // OpenAI doesn't provide cache info in streaming + const inputTokens = promptTokens; + const outputTokens = completionTokens; const prev = this.conversationUsage.get(conversationId) || { inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0, @@ -758,7 +815,8 @@ export class OpenCodeManager { this.conversationUsage.set(conversationId, cumulative); callbacks.onTokenUsage({ - inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens: 0, totalTokens, + inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens: 0, + totalTokens: totalTokens || inputTokens + outputTokens, cumulativeInputTokens: cumulative.inputTokens, cumulativeOutputTokens: cumulative.outputTokens, cumulativeCacheReadTokens: cumulative.cacheReadTokens, @@ -767,57 +825,40 @@ export class OpenCodeManager { }); } - console.log('[OpenCodeManager:OpenAI] Round', round, 'status:', response.statusCode, 'content type:', typeof choice?.message?.content, 'content length:', choice?.message?.content?.length, 'tool_calls:', choice?.message?.tool_calls?.length); - - if (!choice?.message) { - throw new Error('API response missing expected message content'); + // Collect tool calls from stream accumulator + const parsedToolCalls: Array<{ id: string; name: string; args: unknown }> = []; + for (const [, tc] of streamAccumulator.toolCalls) { + try { + parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) }); + } catch { + parsedToolCalls.push({ id: tc.id, name: tc.name, args: {} }); + } } - // Handle content that might be a string or an array of content parts - let textContent = ''; - const content = choice.message.content; - if (typeof content === 'string') { - textContent = content; - } else if (Array.isArray(content)) { - // Handle array of content parts (some models return this format) - // Accept any part that has a text field, regardless of type - textContent = content - .filter((part: { type?: string; text?: string }) => part.text) - .map((part: { text: string }) => part.text) - .join(''); - - // Log what types we're seeing for debugging - const types = content.map((p: { type?: string }) => p.type).filter(Boolean); - if (types.length > 0) { - console.log('[OpenCodeManager:OpenAI] Content block types:', types); - } - } else if (content && typeof content === 'object') { - // Handle single object with text field - if ('text' in content && typeof content.text === 'string') { - textContent = content.text; - } - } - - if (textContent) { - accumulatedText += textContent; - if (callbacks.onDelta) { - callbacks.onDelta(textContent); - } - } + console.log('[OpenCodeManager:OpenAI] Round', round, 'finishReason:', finishReason, 'text length:', accumulatedText.length, 'toolCalls:', parsedToolCalls.length); // If no tool calls, we're done - if (!choice.message.tool_calls || choice.message.tool_calls.length === 0) { + if (parsedToolCalls.length === 0 || finishReason !== 'tool_calls') { console.log('[OpenCodeManager:OpenAI] Done. Accumulated text length:', accumulatedText.length); return { content: accumulatedText, toolCalls: allToolCalls }; } - // Add assistant message (with tool_calls) to conversation - messages.push(choice.message); + // Build the assistant message with tool_calls for conversation history + const assistantMessage: Record = { + role: 'assistant', + content: accumulatedText || null, + tool_calls: parsedToolCalls.map((tc) => ({ + id: tc.id, + type: 'function', + function: { name: tc.name, arguments: JSON.stringify(tc.args) }, + })), + }; + messages.push(assistantMessage); // Execute tool calls and add results - for (const toolCall of choice.message.tool_calls) { - const toolName = toolCall.function.name; - const toolArgs = JSON.parse(toolCall.function.arguments || '{}'); + for (const toolCall of parsedToolCalls) { + const toolName = toolCall.name; + const toolArgs = toolCall.args; allToolCalls.push({ name: toolName, args: toolArgs }); if (callbacks.onToolCall) { @@ -826,7 +867,7 @@ export class OpenCodeManager { // Check if this is a render tool if (isRenderTool(toolName)) { - const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs); + const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs as Record); if (a2uiMessages) { emitA2UIMessages(a2uiMessages); } @@ -843,7 +884,7 @@ export class OpenCodeManager { continue; } - const result = await this.executeTool(toolName, toolArgs); + const result = await this.executeTool(toolName, toolArgs as Record); if (callbacks.onToolResult) { callbacks.onToolResult({ name: toolName, result }); diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts new file mode 100644 index 0000000..6d755fb --- /dev/null +++ b/src/main/engine/streaming.ts @@ -0,0 +1,529 @@ +/** + * SSE Streaming Infrastructure + * + * Provides SSE line parsing, event parsers for OpenAI/Mistral and Anthropic + * stream formats, tool-call accumulation, and retry-with-exponential-backoff. + * + * Used by OpenCodeManager to convert buffered HTTP calls to real-time + * token-by-token streaming for all chat providers. + */ + +import https from 'https'; +import http from 'http'; +import { URL } from 'url'; + +// ── Types ── + +export interface SSEEvent { + event?: string; + data: string; +} + +export interface StreamEventResult { + /** Text content delta to emit to UI */ + textDelta?: string; + /** Whether the stream is complete */ + done: boolean; + /** Finish reason from the model */ + finishReason?: string; + /** Token usage information */ + usage?: { + promptTokens?: number; + completionTokens?: number; + totalTokens?: number; + inputTokens?: number; + outputTokens?: number; + cacheReadTokens?: number; + cacheWriteTokens?: number; + }; +} + +interface ToolCallAccumulator { + id: string; + name: string; + arguments: string; +} + +export interface OpenAIStreamAccumulator { + toolCalls: Map; +} + +export interface AnthropicStreamAccumulator { + toolCalls: Map; +} + +export interface HttpStreamError extends Error { + statusCode?: number; + retryAfter?: number; + isAbort?: boolean; +} + +// ── SSE Line Parsing ── + +/** + * Parse raw SSE text into structured events. + * + * SSE protocol: events are separated by double-newlines (\n\n). + * Each event can have `event:` and `data:` lines. + * Multiple `data:` lines within one event are concatenated with newlines. + * Lines starting with `:` are comments (ignored). + * + * Returns parsed events and any remaining incomplete text (buffer). + */ +export function parseSSELines(text: string): { events: SSEEvent[]; remaining: string } { + const events: SSEEvent[] = []; + + // Normalize \r\n to \n + const normalized = text.replace(/\r\n/g, '\n'); + + // Split on double-newline (event boundary) + const parts = normalized.split('\n\n'); + + // Last part may be incomplete (no trailing \n\n) + const remaining = normalized.endsWith('\n\n') ? '' : parts.pop() || ''; + + for (const part of parts) { + if (!part.trim()) continue; + + let eventType: string | undefined; + const dataLines: string[] = []; + + for (const line of part.split('\n')) { + // Comment lines start with ':' + if (line.startsWith(':')) continue; + + if (line.startsWith('event: ') || line.startsWith('event:')) { + eventType = line.slice(line.indexOf(':') + 1).trim(); + } else if (line.startsWith('data: ') || line.startsWith('data:')) { + dataLines.push(line.slice(line.indexOf(':') + 1).trimStart()); + } + } + + if (dataLines.length > 0) { + events.push({ + event: eventType, + data: dataLines.join('\n'), + }); + } + } + + return { events, remaining }; +} + +// ── Accumulator Factories ── + +export function createOpenAIStreamAccumulator(): OpenAIStreamAccumulator { + return { toolCalls: new Map() }; +} + +export function createAnthropicStreamAccumulator(): AnthropicStreamAccumulator { + return { toolCalls: new Map() }; +} + +// ── OpenAI/Mistral SSE Parser ── + +/** + * Parse a single OpenAI/Mistral SSE event and update the accumulator. + * + * OpenAI streaming format: + * - Text deltas: choices[0].delta.content + * - Tool call start: delta.tool_calls[i] with id + function.name + * - Tool call fragments: delta.tool_calls[i].function.arguments (append) + * - Finish reason: choices[0].finish_reason + * - Usage: usage object in final chunk (requires stream_options.include_usage) + * - [DONE] sentinel: stop iteration + */ +export function parseOpenAIStreamEvent( + event: SSEEvent, + accumulator: OpenAIStreamAccumulator, +): StreamEventResult { + // Handle [DONE] sentinel + if (event.data === '[DONE]') { + return { done: true }; + } + + const data = JSON.parse(event.data); + const choice = data.choices?.[0]; + const result: StreamEventResult = { done: false }; + + if (choice) { + const delta = choice.delta; + + // Text content delta + if (delta?.content && delta.content.length > 0) { + result.textDelta = delta.content; + } + + // Tool calls + if (delta?.tool_calls) { + for (const tc of delta.tool_calls) { + const idx = tc.index; + const existing = accumulator.toolCalls.get(idx); + + if (tc.id || tc.function?.name) { + // New tool call or update + if (!existing) { + accumulator.toolCalls.set(idx, { + id: tc.id || '', + name: tc.function?.name || '', + arguments: tc.function?.arguments || '', + }); + } else { + if (tc.id) existing.id = tc.id; + if (tc.function?.name) existing.name = tc.function.name; + if (tc.function?.arguments) existing.arguments += tc.function.arguments; + } + } else if (existing && tc.function?.arguments) { + // Append argument fragment + existing.arguments += tc.function.arguments; + } + } + } + + // Finish reason + if (choice.finish_reason) { + result.finishReason = choice.finish_reason; + } + } + + // Token usage (arrives in final chunk with stream_options.include_usage) + if (data.usage) { + result.usage = { + promptTokens: data.usage.prompt_tokens, + completionTokens: data.usage.completion_tokens, + totalTokens: data.usage.total_tokens, + }; + } + + return result; +} + +// ── Anthropic SSE Parser ── + +/** + * Parse a single Anthropic SSE event and update the accumulator. + * + * Anthropic streaming format uses named event types: + * - message_start: input token usage + * - content_block_start: text or tool_use block begins + * - content_block_delta: text_delta or input_json_delta + * - content_block_stop: block ends + * - message_delta: output tokens + stop_reason + * - message_stop: stream complete + * - ping: keep-alive (ignored) + * - error: server error mid-stream + */ +export function parseAnthropicStreamEvent( + event: SSEEvent, + accumulator: AnthropicStreamAccumulator, +): StreamEventResult { + const data = JSON.parse(event.data); + const result: StreamEventResult = { done: false }; + + switch (event.event) { + case 'message_start': { + const usage = data.message?.usage; + if (usage) { + result.usage = { + inputTokens: usage.input_tokens || 0, + cacheReadTokens: usage.cache_read_input_tokens || 0, + cacheWriteTokens: usage.cache_creation_input_tokens || 0, + }; + } + break; + } + + case 'content_block_start': { + const block = data.content_block; + if (block?.type === 'tool_use') { + accumulator.toolCalls.set(data.index, { + id: block.id, + name: block.name, + arguments: '', + }); + } + // text block start is a no-op (empty initial text) + break; + } + + case 'content_block_delta': { + const delta = data.delta; + if (delta?.type === 'text_delta' && delta.text) { + result.textDelta = delta.text; + } else if (delta?.type === 'input_json_delta' && delta.partial_json) { + const tc = accumulator.toolCalls.get(data.index); + if (tc) { + tc.arguments += delta.partial_json; + } + } + break; + } + + case 'content_block_stop': + // Block is complete. Tool arguments can now be parsed by the caller. + break; + + case 'message_delta': { + if (data.usage) { + result.usage = { + outputTokens: data.usage.output_tokens || 0, + }; + } + if (data.delta?.stop_reason) { + result.finishReason = data.delta.stop_reason; + } + break; + } + + case 'message_stop': + result.done = true; + break; + + case 'ping': + // Keep-alive, ignore + break; + + case 'error': { + const errorMsg = data.error?.message || 'Unknown streaming error'; + throw new Error(errorMsg); + } + + default: + // Unknown event type, ignore + break; + } + + return result; +} + +// ── Retry with Exponential Backoff ── + +const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]); + +/** + * Retry a function with exponential backoff for transient HTTP errors. + * + * Retries on 429 (rate limit), 502 (bad gateway), 503 (service unavailable). + * Does NOT retry on other 4xx errors or abort. + * Respects Retry-After header for 429 responses. + */ +export async function withRetry( + fn: () => Promise, + options: { maxRetries?: number } = {}, +): Promise { + const maxRetries = options.maxRetries ?? 3; + let lastError: Error | undefined; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await fn(); + } catch (error) { + lastError = error as Error; + const httpError = error as HttpStreamError; + + // Don't retry on abort + if (httpError.isAbort || httpError.message === 'Request cancelled') { + throw error; + } + + // Don't retry on non-retryable status codes + if (httpError.statusCode && !RETRYABLE_STATUS_CODES.has(httpError.statusCode)) { + throw error; + } + + // Don't retry if we've exhausted retries + if (attempt >= maxRetries) { + throw error; + } + + // Calculate delay with exponential backoff and jitter + const baseDelay = Math.pow(2, attempt) * 1000; // 1s, 2s, 4s + const jitter = Math.random() * 500; + let delay = baseDelay + jitter; + + // Respect Retry-After header for 429 + if (httpError.retryAfter && httpError.retryAfter > 0) { + delay = Math.max(delay, httpError.retryAfter * 1000); + } + + await new Promise(resolve => setTimeout(resolve, delay)); + } + } + + throw lastError; +} + +// ── HTTP Streaming Request ── + +interface HttpStreamOptions { + method?: string; + headers?: Record; + body?: string; + signal?: AbortSignal; + timeout?: number; +} + +/** + * Make an HTTP request that returns an async iterable of SSE events. + * + * Uses Node.js http/https modules directly, reading the response + * as a readable stream and parsing SSE events incrementally. + * + * On non-2xx status: collects the error body and throws. + * Supports AbortSignal for cancellation. + */ +export function httpRequestStream( + urlStr: string, + options: HttpStreamOptions, +): Promise<{ + statusCode: number; + events: AsyncIterable; +}> { + return new Promise((resolve, reject) => { + const url = new URL(urlStr); + const protocol = url.protocol === 'https:' ? https : http; + const timeout = options.timeout ?? 120000; + + const req = protocol.request(url, { + method: options.method || 'POST', + headers: options.headers || {}, + timeout, + }, (res) => { + const statusCode = res.statusCode || 0; + + // Non-2xx: collect error body and throw + if (statusCode < 200 || statusCode >= 300) { + let errorBody = ''; + res.on('data', (chunk: Buffer) => { errorBody += chunk; }); + res.on('end', () => { + const error: HttpStreamError = new Error(`API error: ${statusCode}`) as HttpStreamError; + error.statusCode = statusCode; + + // Parse Retry-After for 429 + if (statusCode === 429) { + const retryAfter = res.headers['retry-after']; + if (retryAfter) { + const seconds = parseInt(retryAfter, 10); + if (!isNaN(seconds)) { + error.retryAfter = seconds; + } + } + } + + // Try to extract a better error message + try { + const parsed = JSON.parse(errorBody); + error.message = parsed.error?.message || parsed.message || error.message; + } catch { + if (errorBody.length > 0) { + error.message = `${error.message}: ${errorBody.slice(0, 200)}`; + } + } + reject(error); + }); + return; + } + + // 2xx: create async iterable of SSE events + const events: AsyncIterable = { + [Symbol.asyncIterator]() { + let buffer = ''; + let done = false; + const eventQueue: SSEEvent[] = []; + let resolveNext: ((value: IteratorResult) => void) | null = null; + let rejectNext: ((error: Error) => void) | null = null; + + res.on('data', (chunk: Buffer) => { + buffer += chunk.toString('utf-8'); + const { events: parsed, remaining } = parseSSELines(buffer); + buffer = remaining; + + for (const event of parsed) { + if (resolveNext) { + const resolve = resolveNext; + resolveNext = null; + rejectNext = null; + resolve({ value: event, done: false }); + } else { + eventQueue.push(event); + } + } + }); + + res.on('end', () => { + done = true; + if (resolveNext) { + const resolve = resolveNext; + resolveNext = null; + rejectNext = null; + resolve({ value: undefined as unknown as SSEEvent, done: true }); + } + }); + + res.on('error', (err: Error) => { + done = true; + if (rejectNext) { + const reject = rejectNext; + resolveNext = null; + rejectNext = null; + reject(err); + } + }); + + return { + next(): Promise> { + // Return queued event immediately + if (eventQueue.length > 0) { + return Promise.resolve({ value: eventQueue.shift()!, done: false }); + } + + // Stream already ended + if (done) { + return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true }); + } + + // Wait for next event + return new Promise>((resolve, reject) => { + resolveNext = resolve; + rejectNext = reject; + }); + }, + }; + }, + }; + + resolve({ statusCode, events }); + }); + + req.on('error', (err: Error) => { + const error: HttpStreamError = err as HttpStreamError; + if (options.signal?.aborted) { + error.isAbort = true; + } + reject(error); + }); + + req.on('timeout', () => { + req.destroy(); + reject(new Error('Request timed out')); + }); + + if (options.signal) { + if (options.signal.aborted) { + req.destroy(); + const error: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + error.isAbort = true; + reject(error); + return; + } + options.signal.addEventListener('abort', () => { + req.destroy(); + }); + } + + if (options.body) { + req.write(options.body); + } + req.end(); + }); +} diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts new file mode 100644 index 0000000..df0d534 --- /dev/null +++ b/tests/engine/streaming.test.ts @@ -0,0 +1,743 @@ +/** + * Tests for SSE streaming infrastructure (PR 1) + * + * Covers: + * - SSE line parsing (buffering partial lines across TCP chunks) + * - OpenAI/Mistral SSE event parsing (text deltas, tool calls, usage, [DONE]) + * - Anthropic SSE event parsing (message_start, content_block_delta, etc.) + * - Tool-call argument accumulation during streaming + * - Error handling (mid-stream errors, non-2xx status, abort) + * - Retry with exponential backoff (429/502/503, Retry-After, no retry on 4xx/abort) + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { + parseSSELines, + parseOpenAIStreamEvent, + parseAnthropicStreamEvent, + withRetry, + type SSEEvent, + type OpenAIStreamAccumulator, + type AnthropicStreamAccumulator, + createOpenAIStreamAccumulator, + createAnthropicStreamAccumulator, +} from '../../src/main/engine/streaming'; + +// ── SSE Line Parsing ── + +describe('parseSSELines', () => { + it('parses a complete SSE event from a single chunk', () => { + const buffer = ''; + const chunk = 'data: {"id":"1","choices":[{"delta":{"content":"Hello"}}]}\n\n'; + const { events, remaining } = parseSSELines(buffer + chunk); + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ event: undefined, data: '{"id":"1","choices":[{"delta":{"content":"Hello"}}]}' }); + expect(remaining).toBe(''); + }); + + it('handles partial lines across TCP chunks', () => { + // First chunk ends mid-line + const chunk1 = 'data: {"id":"1","cho'; + const { events: events1, remaining: rem1 } = parseSSELines(chunk1); + expect(events1).toHaveLength(0); + expect(rem1).toBe('data: {"id":"1","cho'); + + // Second chunk completes the line + const chunk2 = 'ices":[{"delta":{"content":"Hello"}}]}\n\n'; + const { events: events2, remaining: rem2 } = parseSSELines(rem1 + chunk2); + expect(events2).toHaveLength(1); + expect(events2[0].data).toBe('{"id":"1","choices":[{"delta":{"content":"Hello"}}]}'); + expect(rem2).toBe(''); + }); + + it('handles multiple events in a single chunk', () => { + const chunk = 'data: {"a":1}\n\ndata: {"b":2}\n\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(2); + expect(events[0].data).toBe('{"a":1}'); + expect(events[1].data).toBe('{"b":2}'); + expect(remaining).toBe(''); + }); + + it('handles named event types (Anthropic format)', () => { + const chunk = 'event: message_start\ndata: {"type":"message_start"}\n\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(1); + expect(events[0].event).toBe('message_start'); + expect(events[0].data).toBe('{"type":"message_start"}'); + expect(remaining).toBe(''); + }); + + it('handles [DONE] sentinel', () => { + const chunk = 'data: [DONE]\n\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(1); + expect(events[0].data).toBe('[DONE]'); + expect(remaining).toBe(''); + }); + + it('ignores empty data lines (keep-alive pings)', () => { + const chunk = ':\n\ndata: {"a":1}\n\n'; + const { events, remaining } = parseSSELines(chunk); + // The comment line ':' should be ignored + expect(events).toHaveLength(1); + expect(events[0].data).toBe('{"a":1}'); + expect(remaining).toBe(''); + }); + + it('handles multiple data lines for a single event (concatenation per SSE spec)', () => { + const chunk = 'data: line1\ndata: line2\n\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(1); + expect(events[0].data).toBe('line1\nline2'); + expect(remaining).toBe(''); + }); + + it('returns incomplete data as remaining buffer', () => { + const chunk = 'data: {"partial'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(0); + expect(remaining).toBe('data: {"partial'); + }); + + it('handles \\r\\n line endings', () => { + const chunk = 'data: {"a":1}\r\n\r\n'; + const { events, remaining } = parseSSELines(chunk); + expect(events).toHaveLength(1); + expect(events[0].data).toBe('{"a":1}'); + expect(remaining).toBe(''); + }); +}); + +// ── OpenAI/Mistral Stream Event Parsing ── + +describe('parseOpenAIStreamEvent', () => { + let accumulator: OpenAIStreamAccumulator; + + beforeEach(() => { + accumulator = createOpenAIStreamAccumulator(); + }); + + it('extracts text delta from content field', () => { + const event: SSEEvent = { + data: JSON.stringify({ + id: 'chatcmpl-1', + choices: [{ delta: { content: 'Hello' }, index: 0 }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.textDelta).toBe('Hello'); + expect(result.done).toBe(false); + }); + + it('accumulates tool call start (id + name)', () => { + const event: SSEEvent = { + data: JSON.stringify({ + id: 'chatcmpl-1', + choices: [{ + delta: { + tool_calls: [{ + index: 0, + id: 'call_abc', + function: { name: 'search_posts', arguments: '' }, + }], + }, + index: 0, + }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(accumulator.toolCalls.get(0)).toEqual({ + id: 'call_abc', + name: 'search_posts', + arguments: '', + }); + }); + + it('accumulates tool call argument fragments', () => { + // First event: tool call start + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, id: 'call_abc', + function: { name: 'search_posts', arguments: '' }, + }], + }, + index: 0, + }], + }), + }, accumulator); + + // Second event: argument fragment + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, + function: { arguments: '{"query"' }, + }], + }, + index: 0, + }], + }), + }, accumulator); + + // Third event: more arguments + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, + function: { arguments: ': "test"}' }, + }], + }, + index: 0, + }], + }), + }, accumulator); + + expect(accumulator.toolCalls.get(0)?.arguments).toBe('{"query": "test"}'); + }); + + it('handles multiple concurrent tool calls', () => { + // Tool call 0 + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [ + { index: 0, id: 'call_1', function: { name: 'search_posts', arguments: '{"q":"a"}' } }, + { index: 1, id: 'call_2', function: { name: 'list_posts', arguments: '{"limit":5}' } }, + ], + }, + index: 0, + }], + }), + }, accumulator); + + expect(accumulator.toolCalls.get(0)?.name).toBe('search_posts'); + expect(accumulator.toolCalls.get(1)?.name).toBe('list_posts'); + }); + + it('detects finish_reason stop', () => { + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, finish_reason: 'stop', index: 0 }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.finishReason).toBe('stop'); + }); + + it('detects finish_reason tool_calls', () => { + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, finish_reason: 'tool_calls', index: 0 }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.finishReason).toBe('tool_calls'); + }); + + it('extracts token usage from final chunk', () => { + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, index: 0 }], + usage: { + prompt_tokens: 150, + completion_tokens: 42, + total_tokens: 192, + }, + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.usage).toEqual({ + promptTokens: 150, + completionTokens: 42, + totalTokens: 192, + }); + }); + + it('handles [DONE] sentinel', () => { + const event: SSEEvent = { data: '[DONE]' }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.done).toBe(true); + }); + + it('returns empty result for empty content delta', () => { + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: { content: '' }, index: 0 }], + }), + }; + const result = parseOpenAIStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + }); +}); + +// ── Anthropic Stream Event Parsing ── + +describe('parseAnthropicStreamEvent', () => { + let accumulator: AnthropicStreamAccumulator; + + beforeEach(() => { + accumulator = createAnthropicStreamAccumulator(); + }); + + it('extracts input_tokens from message_start', () => { + const event: SSEEvent = { + event: 'message_start', + data: JSON.stringify({ + type: 'message_start', + message: { + id: 'msg_1', + model: 'claude-sonnet-4-5', + usage: { + input_tokens: 150, + cache_read_input_tokens: 50, + cache_creation_input_tokens: 10, + }, + }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.usage).toEqual({ + inputTokens: 150, + cacheReadTokens: 50, + cacheWriteTokens: 10, + }); + }); + + it('handles text content_block_start (no-op)', () => { + const event: SSEEvent = { + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'text', text: '' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + }); + + it('handles tool_use content_block_start', () => { + const event: SSEEvent = { + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 1, + content_block: { type: 'tool_use', id: 'toolu_abc', name: 'search_posts' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(accumulator.toolCalls.get(1)).toEqual({ + id: 'toolu_abc', + name: 'search_posts', + arguments: '', + }); + }); + + it('extracts text_delta from content_block_delta', () => { + const event: SSEEvent = { + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'Hello world' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBe('Hello world'); + }); + + it('accumulates tool input_json_delta fragments', () => { + // Start tool block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 1, + content_block: { type: 'tool_use', id: 'toolu_abc', name: 'search_posts' }, + }), + }, accumulator); + + // First argument fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'input_json_delta', partial_json: '{"query"' }, + }), + }, accumulator); + + // Second argument fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'input_json_delta', partial_json: ': "test"}' }, + }), + }, accumulator); + + expect(accumulator.toolCalls.get(1)?.arguments).toBe('{"query": "test"}'); + }); + + it('extracts output_tokens from message_delta', () => { + const event: SSEEvent = { + event: 'message_delta', + data: JSON.stringify({ + type: 'message_delta', + delta: { stop_reason: 'end_turn' }, + usage: { output_tokens: 42 }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.usage).toEqual({ outputTokens: 42 }); + expect(result.finishReason).toBe('end_turn'); + }); + + it('signals done on message_stop', () => { + const event: SSEEvent = { + event: 'message_stop', + data: JSON.stringify({ type: 'message_stop' }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.done).toBe(true); + }); + + it('ignores ping events', () => { + const event: SSEEvent = { + event: 'ping', + data: JSON.stringify({ type: 'ping' }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(result.done).toBe(false); + }); + + it('throws on error events', () => { + const event: SSEEvent = { + event: 'error', + data: JSON.stringify({ + type: 'error', + error: { type: 'overloaded_error', message: 'Server is overloaded' }, + }), + }; + expect(() => parseAnthropicStreamEvent(event, accumulator)).toThrow('Server is overloaded'); + }); + + it('signals tool_use finish reason from message_delta', () => { + const event: SSEEvent = { + event: 'message_delta', + data: JSON.stringify({ + type: 'message_delta', + delta: { stop_reason: 'tool_use' }, + usage: { output_tokens: 10 }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.finishReason).toBe('tool_use'); + }); +}); + +// ── Tool Call Accumulation ── + +describe('tool call accumulation', () => { + it('OpenAI: builds complete tool calls from fragments', () => { + const acc = createOpenAIStreamAccumulator(); + + // Start + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { + tool_calls: [{ + index: 0, id: 'call_1', + function: { name: 'search_posts', arguments: '' }, + }], + }, + index: 0, + }], + }), + }, acc); + + // Fragments + for (const frag of ['{"', 'query', '": "', 'hello', '"}']) { + parseOpenAIStreamEvent({ + data: JSON.stringify({ + choices: [{ + delta: { tool_calls: [{ index: 0, function: { arguments: frag } }] }, + index: 0, + }], + }), + }, acc); + } + + const tc = acc.toolCalls.get(0)!; + expect(tc.id).toBe('call_1'); + expect(tc.name).toBe('search_posts'); + expect(JSON.parse(tc.arguments)).toEqual({ query: 'hello' }); + }); + + it('Anthropic: builds complete tool calls from fragments', () => { + const acc = createAnthropicStreamAccumulator(); + + // Start block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 1, + content_block: { type: 'tool_use', id: 'toolu_1', name: 'list_posts' }, + }), + }, acc); + + // Fragments + for (const frag of ['{"', 'limit', '": ', '5}']) { + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'input_json_delta', partial_json: frag }, + }), + }, acc); + } + + const tc = acc.toolCalls.get(1)!; + expect(tc.id).toBe('toolu_1'); + expect(tc.name).toBe('list_posts'); + expect(JSON.parse(tc.arguments)).toEqual({ limit: 5 }); + }); +}); + +// ── Retry with Exponential Backoff ── + +describe('withRetry', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + it('returns result on first successful call', async () => { + const fn = vi.fn().mockResolvedValue('success'); + const promise = withRetry(fn, { maxRetries: 3 }); + const result = await promise; + expect(result).toBe('success'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('retries on 429 status and succeeds', async () => { + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('success'); + + const promise = withRetry(fn, { maxRetries: 3 }); + // Advance past the retry delay + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('success'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + it('retries on 502 status', async () => { + const error502 = Object.assign(new Error('Bad Gateway'), { statusCode: 502 }); + const fn = vi.fn() + .mockRejectedValueOnce(error502) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + it('retries on 503 status', async () => { + const error503 = Object.assign(new Error('Service Unavailable'), { statusCode: 503 }); + const fn = vi.fn() + .mockRejectedValueOnce(error503) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + it('does NOT retry on 400 status', async () => { + const error400 = Object.assign(new Error('Bad Request'), { statusCode: 400 }); + const fn = vi.fn().mockRejectedValue(error400); + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Bad Request'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does NOT retry on 401 status', async () => { + const error401 = Object.assign(new Error('Unauthorized'), { statusCode: 401 }); + const fn = vi.fn().mockRejectedValue(error401); + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Unauthorized'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does NOT retry on 403 status', async () => { + const error403 = Object.assign(new Error('Forbidden'), { statusCode: 403 }); + const fn = vi.fn().mockRejectedValue(error403); + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Forbidden'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does NOT retry on abort', async () => { + const abortError = Object.assign(new Error('Request cancelled'), { isAbort: true }); + const fn = vi.fn().mockRejectedValue(abortError); + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Request cancelled'); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('exhausts max retries and throws last error', async () => { + vi.useRealTimers(); // Real timers work better for this test + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + let callCount = 0; + const fn = vi.fn().mockImplementation(() => { + callCount++; + return Promise.reject(error429); + }); + + await expect(withRetry(fn, { maxRetries: 2 })).rejects.toThrow('Rate limited'); + expect(fn).toHaveBeenCalledTimes(3); // 1 initial + 2 retries + vi.useFakeTimers(); // Restore for afterEach + }); + + it('respects Retry-After header for 429', async () => { + const error429 = Object.assign(new Error('Rate limited'), { + statusCode: 429, + retryAfter: 5, + }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + // Should NOT have retried yet at 3 seconds (Retry-After is 5) + await vi.advanceTimersByTimeAsync(3000); + expect(fn).toHaveBeenCalledTimes(1); + // Advance past the Retry-After + await vi.advanceTimersByTimeAsync(3000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + }); + + afterEach(() => { + vi.useRealTimers(); + }); +}); + +// ── Full stream-to-result integration ── + +describe('stream event sequences', () => { + it('OpenAI: processes a complete text response stream', () => { + const acc = createOpenAIStreamAccumulator(); + const textChunks: string[] = []; + + const events: SSEEvent[] = [ + { data: JSON.stringify({ choices: [{ delta: { role: 'assistant' }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { content: 'Hello' }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { content: ' world' }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { content: '!' }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: {}, finish_reason: 'stop', index: 0 }], usage: { prompt_tokens: 10, completion_tokens: 3, total_tokens: 13 } }) }, + { data: '[DONE]' }, + ]; + + for (const event of events) { + const result = parseOpenAIStreamEvent(event, acc); + if (result.textDelta) textChunks.push(result.textDelta); + } + + expect(textChunks.join('')).toBe('Hello world!'); + }); + + it('Anthropic: processes a complete text response stream', () => { + const acc = createAnthropicStreamAccumulator(); + const textChunks: string[] = []; + + const events: SSEEvent[] = [ + { event: 'message_start', data: JSON.stringify({ type: 'message_start', message: { id: 'msg_1', model: 'claude-sonnet-4', usage: { input_tokens: 100 } } }) }, + { event: 'content_block_start', data: JSON.stringify({ type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } }) }, + { event: 'content_block_delta', data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Hello' } }) }, + { event: 'content_block_delta', data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: ' world!' } }) }, + { event: 'content_block_stop', data: JSON.stringify({ type: 'content_block_stop', index: 0 }) }, + { event: 'message_delta', data: JSON.stringify({ type: 'message_delta', delta: { stop_reason: 'end_turn' }, usage: { output_tokens: 5 } }) }, + { event: 'message_stop', data: JSON.stringify({ type: 'message_stop' }) }, + ]; + + for (const event of events) { + const result = parseAnthropicStreamEvent(event, acc); + if (result.textDelta) textChunks.push(result.textDelta); + } + + expect(textChunks.join('')).toBe('Hello world!'); + }); + + it('OpenAI: processes a tool call response stream', () => { + const acc = createOpenAIStreamAccumulator(); + + const events: SSEEvent[] = [ + { data: JSON.stringify({ choices: [{ delta: { role: 'assistant', tool_calls: [{ index: 0, id: 'call_1', function: { name: 'search_posts', arguments: '' } }] }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { tool_calls: [{ index: 0, function: { arguments: '{"query"' } }] }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: { tool_calls: [{ index: 0, function: { arguments: ': "test"}' } }] }, index: 0 }] }) }, + { data: JSON.stringify({ choices: [{ delta: {}, finish_reason: 'tool_calls', index: 0 }] }) }, + { data: '[DONE]' }, + ]; + + for (const event of events) { + parseOpenAIStreamEvent(event, acc); + } + + expect(acc.toolCalls.size).toBe(1); + const tc = acc.toolCalls.get(0)!; + expect(tc.name).toBe('search_posts'); + expect(JSON.parse(tc.arguments)).toEqual({ query: 'test' }); + }); + + it('Anthropic: processes a tool call response stream', () => { + const acc = createAnthropicStreamAccumulator(); + + const events: SSEEvent[] = [ + { event: 'message_start', data: JSON.stringify({ type: 'message_start', message: { id: 'msg_1', usage: { input_tokens: 100 } } }) }, + { event: 'content_block_start', data: JSON.stringify({ type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } }) }, + { event: 'content_block_delta', data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'Let me search.' } }) }, + { event: 'content_block_stop', data: JSON.stringify({ type: 'content_block_stop', index: 0 }) }, + { event: 'content_block_start', data: JSON.stringify({ type: 'content_block_start', index: 1, content_block: { type: 'tool_use', id: 'toolu_1', name: 'search_posts' } }) }, + { event: 'content_block_delta', data: JSON.stringify({ type: 'content_block_delta', index: 1, delta: { type: 'input_json_delta', partial_json: '{"query": "test"}' } }) }, + { event: 'content_block_stop', data: JSON.stringify({ type: 'content_block_stop', index: 1 }) }, + { event: 'message_delta', data: JSON.stringify({ type: 'message_delta', delta: { stop_reason: 'tool_use' }, usage: { output_tokens: 20 } }) }, + { event: 'message_stop', data: JSON.stringify({ type: 'message_stop' }) }, + ]; + + const textChunks: string[] = []; + for (const event of events) { + const result = parseAnthropicStreamEvent(event, acc); + if (result.textDelta) textChunks.push(result.textDelta); + } + + expect(textChunks.join('')).toBe('Let me search.'); + expect(acc.toolCalls.size).toBe(1); + const tc = acc.toolCalls.get(1)!; + expect(tc.name).toBe('search_posts'); + expect(JSON.parse(tc.arguments)).toEqual({ query: 'test' }); + }); +}); From 781cedade5c6b7834e84bb3560cd4e3dfff6888f Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 10:25:54 +0100 Subject: [PATCH 02/10] fix: SSE streaming review fixes - Fix OpenAI path using accumulatedText instead of round-specific text in assistant messages for multi-round tool conversations - Guard JSON.parse in both SSE parsers against corrupted events - Extract cacheReadTokens from OpenAI prompt_tokens_details when available - Add tests for JSON parse resilience and cache token extraction (7 new tests) --- src/main/engine/OpenCodeManager.ts | 9 ++- src/main/engine/streaming.ts | 51 ++++++++---- tests/engine/streaming.test.ts | 123 +++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 20 deletions(-) diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index c433ec1..3545d8e 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -761,6 +761,8 @@ export class OpenCodeManager { let promptTokens = 0; let completionTokens = 0; let totalTokens = 0; + let cacheReadTokens = 0; + let roundText = ''; // Text produced in this round only const { events } = await withRetry(() => httpRequestStream(ZEN_OPENAI_URL, { method: 'POST', @@ -778,6 +780,7 @@ export class OpenCodeManager { // Emit text deltas immediately for real-time streaming if (result.textDelta) { accumulatedText += result.textDelta; + roundText += result.textDelta; if (callbacks.onDelta) { callbacks.onDelta(result.textDelta); } @@ -788,6 +791,7 @@ export class OpenCodeManager { if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; } if (result.finishReason) { @@ -799,8 +803,7 @@ export class OpenCodeManager { // Emit token usage after stream completes if (callbacks.onTokenUsage) { - const cacheReadTokens = 0; // OpenAI doesn't provide cache info in streaming - const inputTokens = promptTokens; + const inputTokens = promptTokens - cacheReadTokens; const outputTokens = completionTokens; const prev = this.conversationUsage.get(conversationId) || { @@ -846,7 +849,7 @@ export class OpenCodeManager { // Build the assistant message with tool_calls for conversation history const assistantMessage: Record = { role: 'assistant', - content: accumulatedText || null, + content: roundText || null, tool_calls: parsedToolCalls.map((tc) => ({ id: tc.id, type: 'function', diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 6d755fb..99c4d5b 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -142,8 +142,15 @@ export function parseOpenAIStreamEvent( return { done: true }; } - const data = JSON.parse(event.data); - const choice = data.choices?.[0]; + let data: Record; + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data = JSON.parse(event.data) as any; + } catch { + // Skip corrupted SSE events (e.g. partial JSON from TCP split) + return { done: false }; + } + const choice = (data as any).choices?.[0]; const result: StreamEventResult = { done: false }; if (choice) { @@ -187,11 +194,14 @@ export function parseOpenAIStreamEvent( } // Token usage (arrives in final chunk with stream_options.include_usage) - if (data.usage) { + if ((data as any).usage) { + const usage = (data as any).usage; + const promptDetails = usage.prompt_tokens_details; result.usage = { - promptTokens: data.usage.prompt_tokens, - completionTokens: data.usage.completion_tokens, - totalTokens: data.usage.total_tokens, + promptTokens: usage.prompt_tokens, + completionTokens: usage.completion_tokens, + totalTokens: usage.total_tokens, + cacheReadTokens: promptDetails?.cached_tokens, }; } @@ -217,12 +227,19 @@ export function parseAnthropicStreamEvent( event: SSEEvent, accumulator: AnthropicStreamAccumulator, ): StreamEventResult { - const data = JSON.parse(event.data); + let data: Record; + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + data = JSON.parse(event.data) as any; + } catch { + // Skip corrupted SSE events (e.g. partial JSON from TCP split) + return { done: false }; + } const result: StreamEventResult = { done: false }; switch (event.event) { case 'message_start': { - const usage = data.message?.usage; + const usage = (data as any).message?.usage; if (usage) { result.usage = { inputTokens: usage.input_tokens || 0, @@ -234,9 +251,9 @@ export function parseAnthropicStreamEvent( } case 'content_block_start': { - const block = data.content_block; + const block = (data as any).content_block; if (block?.type === 'tool_use') { - accumulator.toolCalls.set(data.index, { + accumulator.toolCalls.set(data.index as number, { id: block.id, name: block.name, arguments: '', @@ -247,11 +264,11 @@ export function parseAnthropicStreamEvent( } case 'content_block_delta': { - const delta = data.delta; + const delta = (data as any).delta; if (delta?.type === 'text_delta' && delta.text) { result.textDelta = delta.text; } else if (delta?.type === 'input_json_delta' && delta.partial_json) { - const tc = accumulator.toolCalls.get(data.index); + const tc = accumulator.toolCalls.get(data.index as number); if (tc) { tc.arguments += delta.partial_json; } @@ -264,13 +281,13 @@ export function parseAnthropicStreamEvent( break; case 'message_delta': { - if (data.usage) { + if ((data as any).usage) { result.usage = { - outputTokens: data.usage.output_tokens || 0, + outputTokens: (data as any).usage.output_tokens || 0, }; } - if (data.delta?.stop_reason) { - result.finishReason = data.delta.stop_reason; + if ((data as any).delta?.stop_reason) { + result.finishReason = (data as any).delta.stop_reason; } break; } @@ -284,7 +301,7 @@ export function parseAnthropicStreamEvent( break; case 'error': { - const errorMsg = data.error?.message || 'Unknown streaming error'; + const errorMsg = (data as any).error?.message || 'Unknown streaming error'; throw new Error(errorMsg); } diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index df0d534..8f914e9 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -741,3 +741,126 @@ describe('stream event sequences', () => { expect(JSON.parse(tc.arguments)).toEqual({ query: 'test' }); }); }); + +// ── JSON Parse Error Resilience ── + +describe('parser JSON error resilience', () => { + it('OpenAI: skips corrupted SSE events with invalid JSON', () => { + const acc = createOpenAIStreamAccumulator(); + const event: SSEEvent = { data: '{corrupted json' }; + const result = parseOpenAIStreamEvent(event, acc); + expect(result.done).toBe(false); + expect(result.textDelta).toBeUndefined(); + }); + + it('OpenAI: recovers from corrupted event and processes subsequent valid events', () => { + const acc = createOpenAIStreamAccumulator(); + + // Corrupted event + parseOpenAIStreamEvent({ data: 'not-json' }, acc); + + // Valid event after corruption + const result = parseOpenAIStreamEvent({ + data: JSON.stringify({ choices: [{ delta: { content: 'OK' }, index: 0 }] }), + }, acc); + expect(result.textDelta).toBe('OK'); + }); + + it('Anthropic: skips corrupted SSE events with invalid JSON', () => { + const acc = createAnthropicStreamAccumulator(); + const event: SSEEvent = { event: 'content_block_delta', data: '{broken' }; + const result = parseAnthropicStreamEvent(event, acc); + expect(result.done).toBe(false); + expect(result.textDelta).toBeUndefined(); + }); + + it('Anthropic: recovers from corrupted event and processes subsequent valid events', () => { + const acc = createAnthropicStreamAccumulator(); + + // Corrupted event + parseAnthropicStreamEvent({ event: 'ping', data: 'not-json' }, acc); + + // Valid event after corruption + const result = parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'Recovered' }, + }), + }, acc); + expect(result.textDelta).toBe('Recovered'); + }); +}); + +// ── OpenAI Cache Token Extraction ── + +describe('OpenAI cache token extraction', () => { + it('extracts cached_tokens from prompt_tokens_details', () => { + const acc = createOpenAIStreamAccumulator(); + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, index: 0 }], + usage: { + prompt_tokens: 150, + completion_tokens: 42, + total_tokens: 192, + prompt_tokens_details: { cached_tokens: 100 }, + }, + }), + }; + const result = parseOpenAIStreamEvent(event, acc); + expect(result.usage).toEqual({ + promptTokens: 150, + completionTokens: 42, + totalTokens: 192, + cacheReadTokens: 100, + }); + }); + + it('returns undefined cacheReadTokens when prompt_tokens_details is absent', () => { + const acc = createOpenAIStreamAccumulator(); + const event: SSEEvent = { + data: JSON.stringify({ + choices: [{ delta: {}, index: 0 }], + usage: { + prompt_tokens: 150, + completion_tokens: 42, + total_tokens: 192, + }, + }), + }; + const result = parseOpenAIStreamEvent(event, acc); + expect(result.usage?.cacheReadTokens).toBeUndefined(); + }); +}); + +// ── httpRequestStream ── + +describe('httpRequestStream', () => { + // We test httpRequestStream by mocking Node's http/https modules + // These tests verify the async iterable, error handling, and abort behavior + + // Helper to create a mock response + function createMockResponse(statusCode: number) { + const handlers: Record void)[]> = {}; + return { + statusCode, + headers: {} as Record, + on(event: string, handler: (...args: unknown[]) => void) { + if (!handlers[event]) handlers[event] = []; + handlers[event].push(handler); + return this; + }, + emit(event: string, ...args: unknown[]) { + for (const h of handlers[event] || []) h(...args); + }, + }; + } + + it('should be importable', async () => { + // Verify the function exists and has the right shape + const { httpRequestStream } = await import('../../src/main/engine/streaming'); + expect(typeof httpRequestStream).toBe('function'); + }); +}); From 5267ff77df2a62c98705430ea7a9c9383bbc5237 Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 10:26:50 +0100 Subject: [PATCH 03/10] chore: updated agents stuff --- .vscode/settings.json | 4 +++- AGENTS.md | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index c394720..6c83a94 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,6 +4,8 @@ "npx tsc": true, "git remote": true, "npx asar": true, - "npx tsx": true + "npx tsx": true, + "gh": true, + "git add": true } } \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md index a2c9b63..d9384f7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,6 +7,10 @@ This document provides context and best practices for GitHub Copilot when workin - Make the plan extremely concise. Sacrifice grammar for the sake of concision. - At the end of each plan, give me a list of unresolved questions to answer, if any. +## Commits + +- commit messages are short - one sentence. do not write long articles. + --- ## ⚠️ MANDATORY: Test-First Development From 2dec5592c923d8d10a29220baebe1e1972bbccf0 Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 10:54:02 +0100 Subject: [PATCH 04/10] fix: SSE streaming review fixes round 2 --- .vscode/settings.json | 2 +- src/main/engine/OpenCodeManager.ts | 176 ++++++++++-------- src/main/engine/streaming.ts | 23 ++- tests/engine/streaming.test.ts | 285 ++++++++++++++++++++++++++--- 4 files changed, 380 insertions(+), 106 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 6c83a94..5e57c86 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,4 +8,4 @@ "gh": true, "git add": true } -} \ No newline at end of file +} diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 3545d8e..fd3eac8 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -473,6 +473,7 @@ export class OpenCodeManager { while (round < MAX_TOOL_ROUNDS) { round++; + if (signal.aborted) break; const body: Record = { model: modelId, @@ -484,51 +485,57 @@ export class OpenCodeManager { cache_control: { type: 'ephemeral' }, }; - // Stream the response with retry for transient errors - const streamAccumulator = createAnthropicStreamAccumulator(); - let stopReason = ''; - let inputTokens = 0; - let outputTokens = 0; - let cacheReadTokens = 0; - let cacheWriteTokens = 0; - let roundText = ''; // Text produced in this round only + // Stream the response with retry for transient errors (including mid-stream failures) + const streamResult = await withRetry(async () => { + const streamAccumulator = createAnthropicStreamAccumulator(); + let stopReason = ''; + let inputTokens = 0; + let outputTokens = 0; + let cacheReadTokens = 0; + let cacheWriteTokens = 0; + let roundText = ''; - const { events } = await withRetry(() => httpRequestStream(ZEN_ANTHROPIC_URL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': this.apiKey, - 'Authorization': `Bearer ${this.apiKey}`, - 'anthropic-version': '2023-06-01', - }, - body: JSON.stringify(body), - signal, - })); + const { events } = await httpRequestStream(ZEN_ANTHROPIC_URL, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': this.apiKey, + 'Authorization': `Bearer ${this.apiKey}`, + 'anthropic-version': '2023-06-01', + }, + body: JSON.stringify(body), + signal, + }); - for await (const event of events) { - const result = parseAnthropicStreamEvent(event, streamAccumulator); + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, streamAccumulator); - // Emit text deltas immediately for real-time streaming - if (result.textDelta) { - accumulatedText += result.textDelta; - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } } + + if (result.usage) { + if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; + if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; + } + + if (result.finishReason) { + stopReason = result.finishReason; + } + + if (result.done) break; } - // Collect usage from message_start (input tokens) and message_delta (output tokens) - if (result.usage) { - if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; - if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; - } + return { roundText, stopReason, toolCalls: streamAccumulator.toolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens }; + }); - if (result.finishReason) { - stopReason = result.finishReason; - } - } + const { roundText, stopReason, toolCalls: streamToolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens } = streamResult; + accumulatedText += roundText; // Emit token usage after stream completes if (callbacks.onTokenUsage) { @@ -558,7 +565,7 @@ export class OpenCodeManager { // Collect tool calls from stream accumulator const toolUseBlocks: Array<{ id: string; name: string; input: unknown }> = []; - for (const [, tc] of streamAccumulator.toolCalls) { + for (const [, tc] of streamToolCalls) { try { toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) }); } catch { @@ -626,7 +633,8 @@ export class OpenCodeManager { continue; } - // Execute the tool + // Execute the tool (check abort before each tool execution) + if (signal.aborted) break; const result = await this.executeTool(toolName, toolArgs as Record); if (callbacks.onToolResult) { @@ -745,6 +753,7 @@ export class OpenCodeManager { while (round < MAX_TOOL_ROUNDS) { round++; + if (signal.aborted) break; const body: Record = { model: modelId, @@ -755,51 +764,55 @@ export class OpenCodeManager { stream_options: { include_usage: true }, }; - // Stream the response with retry for transient errors - const streamAccumulator = createOpenAIStreamAccumulator(); - let finishReason = ''; - let promptTokens = 0; - let completionTokens = 0; - let totalTokens = 0; - let cacheReadTokens = 0; - let roundText = ''; // Text produced in this round only + // Stream the response with retry for transient errors (including mid-stream failures) + const streamResult = await withRetry(async () => { + const streamAccumulator = createOpenAIStreamAccumulator(); + let finishReason = ''; + let promptTokens = 0; + let completionTokens = 0; + let totalTokens = 0; + let cacheReadTokens = 0; + let roundText = ''; - const { events } = await withRetry(() => httpRequestStream(ZEN_OPENAI_URL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${this.apiKey}`, - }, - body: JSON.stringify(body), - signal, - })); + const { events } = await httpRequestStream(ZEN_OPENAI_URL, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${this.apiKey}`, + }, + body: JSON.stringify(body), + signal, + }); - for await (const event of events) { - const result = parseOpenAIStreamEvent(event, streamAccumulator); + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, streamAccumulator); - // Emit text deltas immediately for real-time streaming - if (result.textDelta) { - accumulatedText += result.textDelta; - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } } + + if (result.usage) { + if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; + if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; + if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + } + + if (result.finishReason) { + finishReason = result.finishReason; + } + + if (result.done) break; } - // Collect usage from final chunk - if (result.usage) { - if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; - if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; - if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - } + return { roundText, finishReason, toolCalls: streamAccumulator.toolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens }; + }); - if (result.finishReason) { - finishReason = result.finishReason; - } - - if (result.done) break; - } + const { roundText, finishReason, toolCalls: streamToolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens } = streamResult; + accumulatedText += roundText; // Emit token usage after stream completes if (callbacks.onTokenUsage) { @@ -818,7 +831,8 @@ export class OpenCodeManager { this.conversationUsage.set(conversationId, cumulative); callbacks.onTokenUsage({ - inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens: 0, + inputTokens, outputTokens, cacheReadTokens, + cacheWriteTokens: 0, // OpenAI streaming does not report cache write tokens totalTokens: totalTokens || inputTokens + outputTokens, cumulativeInputTokens: cumulative.inputTokens, cumulativeOutputTokens: cumulative.outputTokens, @@ -830,7 +844,7 @@ export class OpenCodeManager { // Collect tool calls from stream accumulator const parsedToolCalls: Array<{ id: string; name: string; args: unknown }> = []; - for (const [, tc] of streamAccumulator.toolCalls) { + for (const [, tc] of streamToolCalls) { try { parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) }); } catch { @@ -887,6 +901,8 @@ export class OpenCodeManager { continue; } + // Check abort before each tool execution + if (signal.aborted) break; const result = await this.executeTool(toolName, toolArgs as Record); if (callbacks.onToolResult) { diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 99c4d5b..1cc6017 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -93,9 +93,11 @@ export function parseSSELines(text: string): { events: SSEEvent[]; remaining: st if (line.startsWith(':')) continue; if (line.startsWith('event: ') || line.startsWith('event:')) { - eventType = line.slice(line.indexOf(':') + 1).trim(); + const afterColon = line.slice(line.indexOf(':') + 1); + eventType = afterColon.startsWith(' ') ? afterColon.slice(1) : afterColon; } else if (line.startsWith('data: ') || line.startsWith('data:')) { - dataLines.push(line.slice(line.indexOf(':') + 1).trimStart()); + const afterColon = line.slice(line.indexOf(':') + 1); + dataLines.push(afterColon.startsWith(' ') ? afterColon.slice(1) : afterColon); } } @@ -326,7 +328,7 @@ const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]); */ export async function withRetry( fn: () => Promise, - options: { maxRetries?: number } = {}, + options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void } = {}, ): Promise { const maxRetries = options.maxRetries ?? 3; let lastError: Error | undefined; @@ -363,6 +365,10 @@ export async function withRetry( delay = Math.max(delay, httpError.retryAfter * 1000); } + if (options.onRetry) { + options.onRetry(attempt + 1, lastError); + } + await new Promise(resolve => setTimeout(resolve, delay)); } } @@ -446,6 +452,7 @@ export function httpRequestStream( [Symbol.asyncIterator]() { let buffer = ''; let done = false; + let pendingError: Error | null = null; const eventQueue: SSEEvent[] = []; let resolveNext: ((value: IteratorResult) => void) | null = null; let rejectNext: ((error: Error) => void) | null = null; @@ -484,6 +491,9 @@ export function httpRequestStream( resolveNext = null; rejectNext = null; reject(err); + } else { + // Store error for next .next() call so it's not silently swallowed + pendingError = err; } }); @@ -494,6 +504,13 @@ export function httpRequestStream( return Promise.resolve({ value: eventQueue.shift()!, done: false }); } + // Throw stored error from a previous event that fired with no consumer waiting + if (pendingError) { + const err = pendingError; + pendingError = null; + return Promise.reject(err); + } + // Stream already ended if (done) { return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true }); diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index 8f914e9..cd8ba00 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -10,12 +10,14 @@ * - Retry with exponential backoff (429/502/503, Retry-After, no retry on 4xx/abort) */ -import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import http from 'http'; import { parseSSELines, parseOpenAIStreamEvent, parseAnthropicStreamEvent, withRetry, + httpRequestStream, type SSEEvent, type OpenAIStreamAccumulator, type AnthropicStreamAccumulator, @@ -838,29 +840,268 @@ describe('OpenAI cache token extraction', () => { // ── httpRequestStream ── describe('httpRequestStream', () => { - // We test httpRequestStream by mocking Node's http/https modules - // These tests verify the async iterable, error handling, and abort behavior + // Use a real HTTP server for integration tests (avoids ESM spyOn limitations) - // Helper to create a mock response - function createMockResponse(statusCode: number) { - const handlers: Record void)[]> = {}; - return { - statusCode, - headers: {} as Record, - on(event: string, handler: (...args: unknown[]) => void) { - if (!handlers[event]) handlers[event] = []; - handlers[event].push(handler); - return this; - }, - emit(event: string, ...args: unknown[]) { - for (const h of handlers[event] || []) h(...args); - }, - }; + function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise }> { + return new Promise((resolve) => { + const server = http.createServer(handler); + server.listen(0, () => { + const addr = server.address() as { port: number }; + resolve({ + url: `http://localhost:${addr.port}`, + close: () => new Promise((r) => server.close(() => r())), + }); + }); + }); } - it('should be importable', async () => { - // Verify the function exists and has the right shape - const { httpRequestStream } = await import('../../src/main/engine/streaming'); - expect(typeof httpRequestStream).toBe('function'); + it('parses streamed SSE events from response data chunks', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":" world"}}]}\n\n'); + res.write('data: [DONE]\n\n'); + res.end(); + }); + + try { + const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' }); + const collected: SSEEvent[] = []; + for await (const event of events) { + collected.push(event); + } + expect(collected).toHaveLength(3); + expect(collected[0].data).toBe('{"choices":[{"delta":{"content":"Hello"}}]}'); + expect(collected[1].data).toBe('{"choices":[{"delta":{"content":" world"}}]}'); + expect(collected[2].data).toBe('[DONE]'); + } finally { + await srv.close(); + } + }); + + it('collects error body and rejects on non-2xx status', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(429, { 'Content-Type': 'application/json', 'Retry-After': '5' }); + res.end(JSON.stringify({ error: { message: 'Rate limited' } })); + }); + + try { + await expect(httpRequestStream(srv.url, {})).rejects.toMatchObject({ + message: 'Rate limited', + statusCode: 429, + retryAfter: 5, + }); + } finally { + await srv.close(); + } + }); + + it('propagates mid-stream errors to async iterable consumer', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hi"}}]}\n\n'); + // Destroy the socket to simulate TCP disconnect + setTimeout(() => res.destroy(), 20); + }); + + try { + const { events } = await httpRequestStream(srv.url, {}); + const collected: SSEEvent[] = []; + await expect(async () => { + for await (const event of events) { + collected.push(event); + } + }).rejects.toThrow(); + + // Should have received the first event before the error + expect(collected).toHaveLength(1); + expect(collected[0].data).toBe('{"choices":[{"delta":{"content":"Hi"}}]}'); + } finally { + await srv.close(); + } + }); + + it('propagates stored error when no consumer was waiting (pendingError fix)', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + // Send data and immediately destroy — error fires before consumer calls .next() + res.write('data: {"ok":true}\n\n'); + // Give a tiny delay so the data event fires first + setTimeout(() => res.destroy(), 5); + }); + + try { + const { events } = await httpRequestStream(srv.url, {}); + const iter = events[Symbol.asyncIterator](); + + // Wait a bit for both data and error to fire + await new Promise(resolve => setTimeout(resolve, 50)); + + // First call should return the queued event + const first = await iter.next(); + expect(first.done).toBe(false); + expect(first.value.data).toBe('{"ok":true}'); + + // Second call should throw the stored (pending) error + await expect(iter.next()).rejects.toThrow(); + } finally { + await srv.close(); + } + }); + + it('handles already-aborted signal', async () => { + // No server needed — should reject immediately + const controller = new AbortController(); + controller.abort(); + + await expect(httpRequestStream('http://localhost:1/test', { + signal: controller.signal, + })).rejects.toMatchObject({ + isAbort: true, + }); + }); + + it('handles non-JSON error body', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(500, { 'Content-Type': 'text/plain' }); + res.end('Internal Server Error'); + }); + + try { + await expect(httpRequestStream(srv.url, {})).rejects.toMatchObject({ + statusCode: 500, + }); + } finally { + await srv.close(); + } + }); +}); + +// ── withRetry onRetry callback ── + +describe('withRetry onRetry callback', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('calls onRetry callback before each retry attempt', async () => { + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const onRetry = vi.fn(); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockRejectedValueOnce(error429) + .mockResolvedValue('success'); + + const promise = withRetry(fn, { maxRetries: 3, onRetry }); + await vi.advanceTimersByTimeAsync(10000); + const result = await promise; + + expect(result).toBe('success'); + expect(onRetry).toHaveBeenCalledTimes(2); + expect(onRetry).toHaveBeenCalledWith(1, error429); + expect(onRetry).toHaveBeenCalledWith(2, error429); + }); + + it('does not call onRetry when first attempt succeeds', async () => { + const onRetry = vi.fn(); + const fn = vi.fn().mockResolvedValue('ok'); + + const result = await withRetry(fn, { maxRetries: 3, onRetry }); + + expect(result).toBe('ok'); + expect(onRetry).not.toHaveBeenCalled(); + }); +}); + +// ── Mid-stream retry integration ── + +describe('mid-stream retry with withRetry', () => { + it('retries stream consumption on transient mid-stream error', async () => { + vi.useRealTimers(); + + let attempt = 0; + const fn = async () => { + attempt++; + if (attempt === 1) { + // First attempt: simulate partial stream then error + const error = Object.assign(new Error('Service temporarily unavailable'), { statusCode: 503 }); + throw error; + } + // Second attempt: succeed + return { text: 'Hello world!', toolCalls: [] }; + }; + + const result = await withRetry(fn, { maxRetries: 2 }); + expect(result).toEqual({ text: 'Hello world!', toolCalls: [] }); + expect(attempt).toBe(2); + }); + + it('retries on mid-stream TCP error (no status code)', async () => { + vi.useRealTimers(); + + let attempt = 0; + const fn = async () => { + attempt++; + if (attempt === 1) { + throw new Error('ECONNRESET'); + } + return 'recovered'; + }; + + const result = await withRetry(fn, { maxRetries: 2 }); + expect(result).toBe('recovered'); + expect(attempt).toBe(2); + }); + + it('does not retry mid-stream abort errors', async () => { + const abortError = Object.assign(new Error('Request cancelled'), { isAbort: true }); + + let attempt = 0; + const fn = async () => { + attempt++; + throw abortError; + }; + + await expect(withRetry(fn, { maxRetries: 3 })).rejects.toThrow('Request cancelled'); + expect(attempt).toBe(1); + }); +}); + +// ── SSE spec compliance ── + +describe('SSE spec compliance - single space removal', () => { + it('removes exactly one leading space after colon in data field', () => { + const chunk = 'data: {"key": "value"}\n\n'; + const { events } = parseSSELines(chunk); + expect(events[0].data).toBe('{"key": "value"}'); + }); + + it('preserves data when no space after colon', () => { + const chunk = 'data:{"key":"value"}\n\n'; + const { events } = parseSSELines(chunk); + expect(events[0].data).toBe('{"key":"value"}'); + }); + + it('preserves extra leading spaces after removing one', () => { + const chunk = 'data: two spaces\n\n'; + const { events } = parseSSELines(chunk); + // Per SSE spec: only one leading space is removed + expect(events[0].data).toBe(' two spaces'); + }); + + it('removes exactly one leading space from event type', () => { + const chunk = 'event: message_start\ndata: {}\n\n'; + const { events } = parseSSELines(chunk); + expect(events[0].event).toBe('message_start'); + }); + + it('handles event type with no space after colon', () => { + const chunk = 'event:ping\ndata: {}\n\n'; + const { events } = parseSSELines(chunk); + expect(events[0].event).toBe('ping'); }); }); From bae229a9a2b69d1ea6dd19c5f74ff6cf1716a764 Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 11:42:07 +0100 Subject: [PATCH 05/10] fix: scope retry to connection only, prevent onDelta double-emission --- src/main/engine/OpenCodeManager.ts | 158 +++++++++++++++-------------- src/main/engine/streaming.ts | 9 +- tests/engine/streaming.test.ts | 116 +++++++++++++++++++++ 3 files changed, 204 insertions(+), 79 deletions(-) diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index fd3eac8..749deaf 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -13,14 +13,12 @@ import http from 'http'; import { URL } from 'url'; import { BrowserWindow } from 'electron'; import { - parseSSELines, parseAnthropicStreamEvent, parseOpenAIStreamEvent, createAnthropicStreamAccumulator, createOpenAIStreamAccumulator, httpRequestStream, withRetry, - type HttpStreamError, } from './streaming'; import { ChatEngine } from './ChatEngine'; import { PostEngine, type PostData } from './PostEngine'; @@ -485,17 +483,10 @@ export class OpenCodeManager { cache_control: { type: 'ephemeral' }, }; - // Stream the response with retry for transient errors (including mid-stream failures) - const streamResult = await withRetry(async () => { - const streamAccumulator = createAnthropicStreamAccumulator(); - let stopReason = ''; - let inputTokens = 0; - let outputTokens = 0; - let cacheReadTokens = 0; - let cacheWriteTokens = 0; - let roundText = ''; - - const { events } = await httpRequestStream(ZEN_ANTHROPIC_URL, { + // Retry only the HTTP connection (429/502/503 are caught before any events are emitted). + // Event processing is outside retry scope to prevent double-emission of onDelta on retry. + const { events } = await withRetry(async () => { + return httpRequestStream(ZEN_ANTHROPIC_URL, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -506,35 +497,42 @@ export class OpenCodeManager { body: JSON.stringify(body), signal, }); - - for await (const event of events) { - const result = parseAnthropicStreamEvent(event, streamAccumulator); - - if (result.textDelta) { - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); - } - } - - if (result.usage) { - if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; - if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; - } - - if (result.finishReason) { - stopReason = result.finishReason; - } - - if (result.done) break; - } - - return { roundText, stopReason, toolCalls: streamAccumulator.toolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens }; }); - const { roundText, stopReason, toolCalls: streamToolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens } = streamResult; + // Process stream events outside retry scope — onDelta is never called twice for the same text + const streamAccumulator = createAnthropicStreamAccumulator(); + let stopReason = ''; + let inputTokens = 0; + let outputTokens = 0; + let cacheReadTokens = 0; + let cacheWriteTokens = 0; + let roundText = ''; + + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, streamAccumulator); + + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } + } + + if (result.usage) { + if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; + if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; + } + + if (result.finishReason) { + stopReason = result.finishReason; + } + + if (result.done) break; + } + + const streamToolCalls = streamAccumulator.toolCalls; accumulatedText += roundText; // Emit token usage after stream completes @@ -681,6 +679,8 @@ export class OpenCodeManager { } } + if (signal.aborted) break; + // Add assistant response and tool results to messages for next round messages = [ ...messages, @@ -764,17 +764,10 @@ export class OpenCodeManager { stream_options: { include_usage: true }, }; - // Stream the response with retry for transient errors (including mid-stream failures) - const streamResult = await withRetry(async () => { - const streamAccumulator = createOpenAIStreamAccumulator(); - let finishReason = ''; - let promptTokens = 0; - let completionTokens = 0; - let totalTokens = 0; - let cacheReadTokens = 0; - let roundText = ''; - - const { events } = await httpRequestStream(ZEN_OPENAI_URL, { + // Retry only the HTTP connection (429/502/503 are caught before any events are emitted). + // Event processing is outside retry scope to prevent double-emission of onDelta on retry. + const { events } = await withRetry(async () => { + return httpRequestStream(ZEN_OPENAI_URL, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -783,35 +776,42 @@ export class OpenCodeManager { body: JSON.stringify(body), signal, }); - - for await (const event of events) { - const result = parseOpenAIStreamEvent(event, streamAccumulator); - - if (result.textDelta) { - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); - } - } - - if (result.usage) { - if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; - if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; - if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - } - - if (result.finishReason) { - finishReason = result.finishReason; - } - - if (result.done) break; - } - - return { roundText, finishReason, toolCalls: streamAccumulator.toolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens }; }); - const { roundText, finishReason, toolCalls: streamToolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens } = streamResult; + // Process stream events outside retry scope — onDelta is never called twice for the same text + const streamAccumulator = createOpenAIStreamAccumulator(); + let finishReason = ''; + let promptTokens = 0; + let completionTokens = 0; + let totalTokens = 0; + let cacheReadTokens = 0; + let roundText = ''; + + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, streamAccumulator); + + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } + } + + if (result.usage) { + if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; + if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; + if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + } + + if (result.finishReason) { + finishReason = result.finishReason; + } + + if (result.done) break; + } + + const streamToolCalls = streamAccumulator.toolCalls; accumulatedText += roundText; // Emit token usage after stream completes @@ -915,6 +915,8 @@ export class OpenCodeManager { tool_call_id: toolCall.id, }); } + + if (signal.aborted) break; } // Hit max rounds diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 1cc6017..9cb50a9 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -323,8 +323,15 @@ const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]); * Retry a function with exponential backoff for transient HTTP errors. * * Retries on 429 (rate limit), 502 (bad gateway), 503 (service unavailable). - * Does NOT retry on other 4xx errors or abort. + * Also retries errors without a statusCode (e.g. ECONNRESET, EPIPE) since + * these indicate transient network failures during connection. + * + * Does NOT retry on other 4xx errors (400, 401, 403 — client errors) or abort. * Respects Retry-After header for 429 responses. + * + * Best practice: wrap only the HTTP connection (httpRequestStream) in withRetry, + * NOT the event processing loop. This ensures onDelta callbacks are never + * called twice for the same text on retry. */ export async function withRetry( fn: () => Promise, diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index cd8ba00..3adcb6d 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -1071,6 +1071,122 @@ describe('mid-stream retry with withRetry', () => { }); }); +// ── Connection-only retry (no double-emission) ── + +describe('connection-only retry pattern (withRetry wrapping httpRequestStream)', () => { + function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise }> { + return new Promise((resolve) => { + const server = http.createServer(handler); + server.listen(0, () => { + const addr = server.address() as { port: number }; + resolve({ + url: `http://localhost:${addr.port}`, + close: () => new Promise((r) => server.close(() => r())), + }); + }); + }); + } + + it('retries 429 at connection time without emitting duplicate deltas', async () => { + let requestCount = 0; + const srv = await startTestServer((_req, res) => { + requestCount++; + if (requestCount === 1) { + res.writeHead(429, { 'Content-Type': 'application/json', 'Retry-After': '0' }); + res.end(JSON.stringify({ error: { message: 'Rate limited' } })); + return; + } + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":" world"}}]}\n\n'); + res.write('data: [DONE]\n\n'); + res.end(); + }); + + try { + const deltas: string[] = []; + + // Retry only the connection, process events outside retry + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createOpenAIStreamAccumulator(); + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + + // Each delta appears exactly once — no double-emission + expect(deltas).toEqual(['Hello', ' world']); + expect(requestCount).toBe(2); // 1 failed + 1 success + } finally { + await srv.close(); + } + }); + + it('mid-stream TCP error propagates without retry when only connection is wrapped', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hi"}}]}\n\n'); + // Destroy socket to simulate mid-stream TCP disconnect + setTimeout(() => res.destroy(), 20); + }); + + try { + const deltas: string[] = []; + + // Only connection is retried — mid-stream errors propagate + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createOpenAIStreamAccumulator(); + await expect(async () => { + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + }).rejects.toThrow(); + + // Partial delta was received before the error — no duplication + expect(deltas).toEqual(['Hi']); + } finally { + await srv.close(); + } + }); + + it('retries 502 at connection time then streams successfully', async () => { + let requestCount = 0; + const srv = await startTestServer((_req, res) => { + requestCount++; + if (requestCount === 1) { + res.writeHead(502); + res.end('Bad Gateway'); + return; + } + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('event: message_start\ndata: {"type":"message_start","message":{"id":"msg_1","usage":{"input_tokens":10}}}\n\n'); + res.write('event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"OK"}}\n\n'); + res.write('event: message_stop\ndata: {"type":"message_stop"}\n\n'); + res.end(); + }); + + try { + const deltas: string[] = []; + + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createAnthropicStreamAccumulator(); + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + + expect(deltas).toEqual(['OK']); + expect(requestCount).toBe(2); + } finally { + await srv.close(); + } + }); +}); + // ── SSE spec compliance ── describe('SSE spec compliance - single space removal', () => { From 5ea55f407daa54fa332f89b0a9248f08908a7d4d Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 11:42:20 +0100 Subject: [PATCH 06/10] chore: agent settings --- .vscode/settings.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 5e57c86..ddd3786 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,6 +6,8 @@ "npx asar": true, "npx tsx": true, "gh": true, - "git add": true + "git add": true, + "git commit": true, + "git push": true } } From 90f541cc843c227a94976ba23c4b9a439e988d60 Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 11:51:36 +0100 Subject: [PATCH 07/10] fix: iterator cleanup, abort listener leak, token guard, tool parse errors --- src/main/engine/OpenCodeManager.ts | 54 ++++++++++++++++++----- src/main/engine/streaming.ts | 9 +++- tests/engine/streaming.test.ts | 70 ++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 11 deletions(-) diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 749deaf..3b3e638 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -137,6 +137,7 @@ interface AnthropicContentBlock { input?: unknown; tool_use_id?: string; content?: string | AnthropicToolResultContent[]; + is_error?: boolean; source?: { type: 'base64'; media_type: string; @@ -535,8 +536,9 @@ export class OpenCodeManager { const streamToolCalls = streamAccumulator.toolCalls; accumulatedText += roundText; - // Emit token usage after stream completes - if (callbacks.onTokenUsage) { + // Emit token usage after stream completes (only when usage data was received) + const hasUsageData = inputTokens > 0 || outputTokens > 0; + if (callbacks.onTokenUsage && hasUsageData) { const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens; const totalTokens = inputTokens + outputTokens; @@ -562,12 +564,13 @@ export class OpenCodeManager { } // Collect tool calls from stream accumulator - const toolUseBlocks: Array<{ id: string; name: string; input: unknown }> = []; + const toolUseBlocks: Array<{ id: string; name: string; input: unknown; parseError?: string }> = []; for (const [, tc] of streamToolCalls) { try { toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) }); - } catch { - toolUseBlocks.push({ id: tc.id, name: tc.name, input: {} }); + } catch (e) { + console.error(`[OpenCodeManager] Failed to parse tool arguments for ${tc.name}:`, tc.arguments); + toolUseBlocks.push({ id: tc.id, name: tc.name, input: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` }); } } @@ -608,6 +611,21 @@ export class OpenCodeManager { callbacks.onToolCall({ name: toolName, args: toolArgs }); } + // If JSON parsing of tool arguments failed, report the error to the model + if (toolBlock.parseError) { + const errorResult = { error: true, message: toolBlock.parseError }; + if (callbacks.onToolResult) { + callbacks.onToolResult({ name: toolName, result: errorResult }); + } + toolResults.push({ + type: 'tool_result', + tool_use_id: toolUseId, + content: JSON.stringify(errorResult), + is_error: true, + }); + continue; + } + // Check if this is a render tool — generate A2UI messages instead of executing if (isRenderTool(toolName)) { const a2uiMessages = generateFromToolCall( @@ -814,8 +832,9 @@ export class OpenCodeManager { const streamToolCalls = streamAccumulator.toolCalls; accumulatedText += roundText; - // Emit token usage after stream completes - if (callbacks.onTokenUsage) { + // Emit token usage after stream completes (only when usage data was received) + const hasUsageData = promptTokens > 0 || completionTokens > 0; + if (callbacks.onTokenUsage && hasUsageData) { const inputTokens = promptTokens - cacheReadTokens; const outputTokens = completionTokens; @@ -843,12 +862,13 @@ export class OpenCodeManager { } // Collect tool calls from stream accumulator - const parsedToolCalls: Array<{ id: string; name: string; args: unknown }> = []; + const parsedToolCalls: Array<{ id: string; name: string; args: unknown; parseError?: string }> = []; for (const [, tc] of streamToolCalls) { try { parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) }); - } catch { - parsedToolCalls.push({ id: tc.id, name: tc.name, args: {} }); + } catch (e) { + console.error(`[OpenCodeManager:OpenAI] Failed to parse tool arguments for ${tc.name}:`, tc.arguments); + parsedToolCalls.push({ id: tc.id, name: tc.name, args: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` }); } } @@ -882,6 +902,20 @@ export class OpenCodeManager { callbacks.onToolCall({ name: toolName, args: toolArgs }); } + // If JSON parsing of tool arguments failed, report the error to the model + if (toolCall.parseError) { + const errorResult = { error: true, message: toolCall.parseError }; + if (callbacks.onToolResult) { + callbacks.onToolResult({ name: toolName, result: errorResult }); + } + messages.push({ + role: 'tool', + content: JSON.stringify(errorResult), + tool_call_id: toolCall.id, + }); + continue; + } + // Check if this is a render tool if (isRenderTool(toolName)) { const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs as Record); diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 9cb50a9..8f7618b 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -529,6 +529,13 @@ export function httpRequestStream( rejectNext = reject; }); }, + return(): Promise> { + // Called when for-await-of exits early (break, return, throw). + // Destroy the response stream to free the socket immediately. + done = true; + res.destroy(); + return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true }); + }, }; }, }; @@ -559,7 +566,7 @@ export function httpRequestStream( } options.signal.addEventListener('abort', () => { req.destroy(); - }); + }, { once: true }); } if (options.body) { diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index 3adcb6d..e839bbc 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -1221,3 +1221,73 @@ describe('SSE spec compliance - single space removal', () => { expect(events[0].event).toBe('ping'); }); }); + +// ── Async iterator return() cleanup ── + +describe('async iterator return() cleanup', () => { + function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise }> { + return new Promise((resolve) => { + const server = http.createServer(handler); + server.listen(0, () => { + const addr = server.address() as { port: number }; + resolve({ + url: `http://localhost:${addr.port}`, + close: () => new Promise((r) => server.close(() => r())), + }); + }); + }); + } + + it('destroys response stream when for-await-of breaks early', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"A"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":"B"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":"C"}}]}\n\n'); + // Don't end the response — the client should destroy it via return() + // Keep connection alive for a bit + setTimeout(() => res.end(), 5000); + }); + + try { + const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' }); + const collected: SSEEvent[] = []; + for await (const event of events) { + collected.push(event); + if (collected.length === 1) break; // Early exit triggers return() + } + + expect(collected).toHaveLength(1); + expect(collected[0].data).toBe('{"choices":[{"delta":{"content":"A"}}]}'); + } finally { + await srv.close(); + } + }); + + it('return() method signals done and is idempotent', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"ok":true}\n\n'); + setTimeout(() => res.end(), 5000); + }); + + try { + const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' }); + const iter = events[Symbol.asyncIterator](); + + // Consume one event + const first = await iter.next(); + expect(first.done).toBe(false); + + // Call return() explicitly + const returnResult = await iter.return!(undefined as unknown as SSEEvent); + expect(returnResult.done).toBe(true); + + // Subsequent next() should return done + const after = await iter.next(); + expect(after.done).toBe(true); + } finally { + await srv.close(); + } + }); +}); From 1d2eba81148d4e99c7c88369de7975260fe4e7e7 Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 11:53:19 +0100 Subject: [PATCH 08/10] chore: agent instructions --- AGENTS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/AGENTS.md b/AGENTS.md index d9384f7..4e35d4a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -10,6 +10,7 @@ This document provides context and best practices for GitHub Copilot when workin ## Commits - commit messages are short - one sentence. do not write long articles. +- pull requests are more verbose and especialy give reasoning for changes --- From 72410b2973292af0f36c17389c493a1f4ff7081f Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 12:02:57 +0100 Subject: [PATCH 09/10] fix: thinking block support, roundText preservation, abort listener leak, typo --- AGENTS.md | 2 +- src/main/engine/OpenCodeManager.ts | 90 ++++++++++++++---------- src/main/engine/streaming.ts | 14 +++- tests/engine/streaming.test.ts | 108 +++++++++++++++++++++++++++++ 4 files changed, 173 insertions(+), 41 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 4e35d4a..9a4525e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -10,7 +10,7 @@ This document provides context and best practices for GitHub Copilot when workin ## Commits - commit messages are short - one sentence. do not write long articles. -- pull requests are more verbose and especialy give reasoning for changes +- pull requests are more verbose and especially give reasoning for changes --- diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 3b3e638..7d1bfd1 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -509,32 +509,37 @@ export class OpenCodeManager { let cacheWriteTokens = 0; let roundText = ''; - for await (const event of events) { - const result = parseAnthropicStreamEvent(event, streamAccumulator); + try { + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, streamAccumulator); - if (result.textDelta) { - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } } - } - if (result.usage) { - if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; - if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; - } + if (result.usage) { + if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; + if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; + } - if (result.finishReason) { - stopReason = result.finishReason; - } + if (result.finishReason) { + stopReason = result.finishReason; + } - if (result.done) break; + if (result.done) break; + } + } finally { + // Preserve text already emitted via onDelta even if the stream errors mid-round + accumulatedText += roundText; } const streamToolCalls = streamAccumulator.toolCalls; - accumulatedText += roundText; + const streamThinkingBlocks = streamAccumulator.thinkingBlocks; // Emit token usage after stream completes (only when usage data was received) const hasUsageData = inputTokens > 0 || outputTokens > 0; @@ -587,6 +592,13 @@ export class OpenCodeManager { // Build assistant content blocks for the next message round const assistantContentBlocks: AnthropicContentBlock[] = []; + // Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled) + for (const [, tb] of streamThinkingBlocks) { + if (tb.text) { + assistantContentBlocks.push({ type: 'thinking', text: tb.text }); + } + } + // Add text block with text from this round if (roundText) { assistantContentBlocks.push({ type: 'text', text: roundText }); @@ -805,32 +817,36 @@ export class OpenCodeManager { let cacheReadTokens = 0; let roundText = ''; - for await (const event of events) { - const result = parseOpenAIStreamEvent(event, streamAccumulator); + try { + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, streamAccumulator); - if (result.textDelta) { - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } } - } - if (result.usage) { - if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; - if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; - if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - } + if (result.usage) { + if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; + if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; + if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + } - if (result.finishReason) { - finishReason = result.finishReason; - } + if (result.finishReason) { + finishReason = result.finishReason; + } - if (result.done) break; + if (result.done) break; + } + } finally { + // Preserve text already emitted via onDelta even if the stream errors mid-round + accumulatedText += roundText; } const streamToolCalls = streamAccumulator.toolCalls; - accumulatedText += roundText; // Emit token usage after stream completes (only when usage data was received) const hasUsageData = promptTokens > 0 || completionTokens > 0; @@ -2347,7 +2363,7 @@ Respond with JSON only: {"title": "...", "alt": "...", "caption": "..."}`; options.signal.addEventListener('abort', () => { req.destroy(); reject(new Error('Request cancelled')); - }); + }, { once: true }); } if (options.body) { diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 8f7618b..8aec14e 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -50,6 +50,7 @@ export interface OpenAIStreamAccumulator { export interface AnthropicStreamAccumulator { toolCalls: Map; + thinkingBlocks: Map; } export interface HttpStreamError extends Error { @@ -119,7 +120,7 @@ export function createOpenAIStreamAccumulator(): OpenAIStreamAccumulator { } export function createAnthropicStreamAccumulator(): AnthropicStreamAccumulator { - return { toolCalls: new Map() }; + return { toolCalls: new Map(), thinkingBlocks: new Map() }; } // ── OpenAI/Mistral SSE Parser ── @@ -217,8 +218,8 @@ export function parseOpenAIStreamEvent( * * Anthropic streaming format uses named event types: * - message_start: input token usage - * - content_block_start: text or tool_use block begins - * - content_block_delta: text_delta or input_json_delta + * - content_block_start: text, tool_use, or thinking block begins + * - content_block_delta: text_delta, input_json_delta, or thinking_delta * - content_block_stop: block ends * - message_delta: output tokens + stop_reason * - message_stop: stream complete @@ -260,6 +261,8 @@ export function parseAnthropicStreamEvent( name: block.name, arguments: '', }); + } else if (block?.type === 'thinking') { + accumulator.thinkingBlocks.set(data.index as number, { text: '' }); } // text block start is a no-op (empty initial text) break; @@ -274,6 +277,11 @@ export function parseAnthropicStreamEvent( if (tc) { tc.arguments += delta.partial_json; } + } else if (delta?.type === 'thinking_delta' && delta.thinking) { + const tb = accumulator.thinkingBlocks.get(data.index as number); + if (tb) { + tb.text += delta.thinking; + } } break; } diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index e839bbc..3617ece 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -449,6 +449,114 @@ describe('parseAnthropicStreamEvent', () => { const result = parseAnthropicStreamEvent(event, accumulator); expect(result.finishReason).toBe('tool_use'); }); + + it('handles thinking content_block_start', () => { + const event: SSEEvent = { + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(accumulator.thinkingBlocks.get(0)).toEqual({ text: '' }); + }); + + it('accumulates thinking_delta fragments', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // First thinking fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Let me think' }, + }), + }, accumulator); + + // Second thinking fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: ' about this...' }, + }), + }, accumulator); + + expect(accumulator.thinkingBlocks.get(0)?.text).toBe('Let me think about this...'); + }); + + it('does not emit thinking_delta as textDelta', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + const result = parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Internal reasoning' }, + }), + }, accumulator); + + // thinking_delta must NOT leak to textDelta — it's internal model reasoning + expect(result.textDelta).toBeUndefined(); + }); + + it('accumulates thinking and text blocks independently', () => { + // Thinking block at index 0 + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Reasoning...' }, + }), + }, accumulator); + + // Text block at index 1 + const textResult = parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'text_delta', text: 'Here is my answer' }, + }), + }, accumulator); + + // Thinking accumulated separately + expect(accumulator.thinkingBlocks.get(0)?.text).toBe('Reasoning...'); + // Text still emitted as textDelta + expect(textResult.textDelta).toBe('Here is my answer'); + }); }); // ── Tool Call Accumulation ── From 2ddaad422f634e1ce914ad7cfa4d1777dbfecb6d Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 12:13:14 +0100 Subject: [PATCH 10/10] fix: thinking signature capture, abort-aware retry delay, usage tracking --- src/main/engine/OpenCodeManager.ts | 17 ++- src/main/engine/streaming.ts | 43 ++++++- tests/engine/streaming.test.ts | 197 +++++++++++++++++++++++++++++ 3 files changed, 248 insertions(+), 9 deletions(-) diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 7d1bfd1..e24b0c5 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -138,6 +138,7 @@ interface AnthropicContentBlock { tool_use_id?: string; content?: string | AnthropicToolResultContent[]; is_error?: boolean; + signature?: string; source?: { type: 'base64'; media_type: string; @@ -508,6 +509,7 @@ export class OpenCodeManager { let cacheReadTokens = 0; let cacheWriteTokens = 0; let roundText = ''; + let receivedUsage = false; try { for await (const event of events) { @@ -521,6 +523,7 @@ export class OpenCodeManager { } if (result.usage) { + receivedUsage = true; if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; @@ -542,8 +545,7 @@ export class OpenCodeManager { const streamThinkingBlocks = streamAccumulator.thinkingBlocks; // Emit token usage after stream completes (only when usage data was received) - const hasUsageData = inputTokens > 0 || outputTokens > 0; - if (callbacks.onTokenUsage && hasUsageData) { + if (callbacks.onTokenUsage && receivedUsage) { const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens; const totalTokens = inputTokens + outputTokens; @@ -595,7 +597,11 @@ export class OpenCodeManager { // Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled) for (const [, tb] of streamThinkingBlocks) { if (tb.text) { - assistantContentBlocks.push({ type: 'thinking', text: tb.text }); + const thinkingBlock: AnthropicContentBlock = { type: 'thinking', text: tb.text }; + if (tb.signature) { + thinkingBlock.signature = tb.signature; + } + assistantContentBlocks.push(thinkingBlock); } } @@ -816,6 +822,7 @@ export class OpenCodeManager { let totalTokens = 0; let cacheReadTokens = 0; let roundText = ''; + let receivedUsage = false; try { for await (const event of events) { @@ -829,6 +836,7 @@ export class OpenCodeManager { } if (result.usage) { + receivedUsage = true; if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; @@ -849,8 +857,7 @@ export class OpenCodeManager { const streamToolCalls = streamAccumulator.toolCalls; // Emit token usage after stream completes (only when usage data was received) - const hasUsageData = promptTokens > 0 || completionTokens > 0; - if (callbacks.onTokenUsage && hasUsageData) { + if (callbacks.onTokenUsage && receivedUsage) { const inputTokens = promptTokens - cacheReadTokens; const outputTokens = completionTokens; diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 8aec14e..5f40ac8 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -50,7 +50,7 @@ export interface OpenAIStreamAccumulator { export interface AnthropicStreamAccumulator { toolCalls: Map; - thinkingBlocks: Map; + thinkingBlocks: Map; } export interface HttpStreamError extends Error { @@ -286,9 +286,18 @@ export function parseAnthropicStreamEvent( break; } - case 'content_block_stop': + case 'content_block_stop': { // Block is complete. Tool arguments can now be parsed by the caller. + // For thinking blocks, capture the signature (required by Anthropic when replaying thinking blocks). + const stopBlock = (data as any).content_block; + if (stopBlock?.type === 'thinking' && stopBlock.signature) { + const tb = accumulator.thinkingBlocks.get(data.index as number); + if (tb) { + tb.signature = stopBlock.signature; + } + } break; + } case 'message_delta': { if ((data as any).usage) { @@ -343,7 +352,7 @@ const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]); */ export async function withRetry( fn: () => Promise, - options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void } = {}, + options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void; signal?: AbortSignal } = {}, ): Promise { const maxRetries = options.maxRetries ?? 3; let lastError: Error | undefined; @@ -360,6 +369,13 @@ export async function withRetry( throw error; } + // Check signal before retrying + if (options.signal?.aborted) { + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + throw abortError; + } + // Don't retry on non-retryable status codes if (httpError.statusCode && !RETRYABLE_STATUS_CODES.has(httpError.statusCode)) { throw error; @@ -384,7 +400,26 @@ export async function withRetry( options.onRetry(attempt + 1, lastError); } - await new Promise(resolve => setTimeout(resolve, delay)); + // Abort-aware delay: reject immediately if signal fires during wait + await new Promise((resolve, reject) => { + const timer = setTimeout(resolve, delay); + if (options.signal) { + const onAbort = () => { + clearTimeout(timer); + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + reject(abortError); + }; + if (options.signal.aborted) { + clearTimeout(timer); + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + reject(abortError); + return; + } + options.signal.addEventListener('abort', onAbort, { once: true }); + } + }); } } diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index 3617ece..77e5358 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -1399,3 +1399,200 @@ describe('async iterator return() cleanup', () => { } }); }); + +// ── Thinking block signature capture ── + +describe('Anthropic thinking block signature', () => { + let accumulator: AnthropicStreamAccumulator; + + beforeEach(() => { + accumulator = createAnthropicStreamAccumulator(); + }); + + it('captures signature from content_block_stop for thinking blocks', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // Accumulate thinking + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Let me reason...' }, + }), + }, accumulator); + + // content_block_stop with signature + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + content_block: { + type: 'thinking', + thinking: 'Let me reason...', + signature: 'ErUBCkYIAxgCIkD+ybfICm10kSig...', + }, + }), + }, accumulator); + + const tb = accumulator.thinkingBlocks.get(0); + expect(tb).toBeDefined(); + expect(tb!.text).toBe('Let me reason...'); + expect(tb!.signature).toBe('ErUBCkYIAxgCIkD+ybfICm10kSig...'); + }); + + it('leaves signature undefined when content_block_stop has no signature', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // content_block_stop without signature + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + }), + }, accumulator); + + const tb = accumulator.thinkingBlocks.get(0); + expect(tb).toBeDefined(); + expect(tb!.signature).toBeUndefined(); + }); + + it('does not affect tool_call blocks on content_block_stop', () => { + // Start tool_use block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'tool_use', id: 'toolu_1', name: 'search_posts' }, + }), + }, accumulator); + + // Tool argument fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'input_json_delta', partial_json: '{"query":"test"}' }, + }), + }, accumulator); + + // content_block_stop (no signature for tool blocks) + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + }), + }, accumulator); + + // Tool call should be unaffected + const tc = accumulator.toolCalls.get(0); + expect(tc).toBeDefined(); + expect(tc!.arguments).toBe('{"query":"test"}'); + }); + + it('full thinking sequence produces signature on accumulator', () => { + // Full realistic sequence: thinking block -> text block -> tool_use + // Thinking at index 0 + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ type: 'content_block_start', index: 0, content_block: { type: 'thinking', thinking: '' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'Step 1. ' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'Step 2.' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ type: 'content_block_stop', index: 0, content_block: { type: 'thinking', thinking: 'Step 1. Step 2.', signature: 'sig_abc123' } }), + }, accumulator); + + expect(accumulator.thinkingBlocks.get(0)).toEqual({ text: 'Step 1. Step 2.', signature: 'sig_abc123' }); + }); +}); + +// ── withRetry abort-aware delay ── + +describe('withRetry abort during delay', () => { + it('rejects quickly when signal is aborted during retry delay', async () => { + const controller = new AbortController(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn().mockRejectedValue(error429); + + const promise = withRetry(fn, { + maxRetries: 3, + signal: controller.signal, + }); + + // First attempt fails immediately, then enters retry delay. + // Wait a small amount for the first attempt to fail and delay to start. + await new Promise(r => setTimeout(r, 50)); + expect(fn).toHaveBeenCalledTimes(1); + + // Abort during the delay + controller.abort(); + + // Should reject with abort error, not wait for delay to finish + await expect(promise).rejects.toThrow(); + // Should NOT have made a second attempt — aborted during delay + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does not abort delay when no signal is provided', async () => { + vi.useFakeTimers(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + vi.useRealTimers(); + }); + + it('works normally when signal is not aborted', async () => { + vi.useFakeTimers(); + const controller = new AbortController(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { + maxRetries: 3, + signal: controller.signal, + }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + vi.useRealTimers(); + }); +});