From 90f541cc843c227a94976ba23c4b9a439e988d60 Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 11:51:36 +0100 Subject: [PATCH] fix: iterator cleanup, abort listener leak, token guard, tool parse errors --- src/main/engine/OpenCodeManager.ts | 54 ++++++++++++++++++----- src/main/engine/streaming.ts | 9 +++- tests/engine/streaming.test.ts | 70 ++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 11 deletions(-) diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 749deaf..3b3e638 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -137,6 +137,7 @@ interface AnthropicContentBlock { input?: unknown; tool_use_id?: string; content?: string | AnthropicToolResultContent[]; + is_error?: boolean; source?: { type: 'base64'; media_type: string; @@ -535,8 +536,9 @@ export class OpenCodeManager { const streamToolCalls = streamAccumulator.toolCalls; accumulatedText += roundText; - // Emit token usage after stream completes - if (callbacks.onTokenUsage) { + // Emit token usage after stream completes (only when usage data was received) + const hasUsageData = inputTokens > 0 || outputTokens > 0; + if (callbacks.onTokenUsage && hasUsageData) { const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens; const totalTokens = inputTokens + outputTokens; @@ -562,12 +564,13 @@ export class OpenCodeManager { } // Collect tool calls from stream accumulator - const toolUseBlocks: Array<{ id: string; name: string; input: unknown }> = []; + const toolUseBlocks: Array<{ id: string; name: string; input: unknown; parseError?: string }> = []; for (const [, tc] of streamToolCalls) { try { toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) }); - } catch { - toolUseBlocks.push({ id: tc.id, name: tc.name, input: {} }); + } catch (e) { + console.error(`[OpenCodeManager] Failed to parse tool arguments for ${tc.name}:`, tc.arguments); + toolUseBlocks.push({ id: tc.id, name: tc.name, input: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` }); } } @@ -608,6 +611,21 @@ export class OpenCodeManager { callbacks.onToolCall({ name: toolName, args: toolArgs }); } + // If JSON parsing of tool arguments failed, report the error to the model + if (toolBlock.parseError) { + const errorResult = { error: true, message: toolBlock.parseError }; + if (callbacks.onToolResult) { + callbacks.onToolResult({ name: toolName, result: errorResult }); + } + toolResults.push({ + type: 'tool_result', + tool_use_id: toolUseId, + content: JSON.stringify(errorResult), + is_error: true, + }); + continue; + } + // Check if this is a render tool — generate A2UI messages instead of executing if (isRenderTool(toolName)) { const a2uiMessages = generateFromToolCall( @@ -814,8 +832,9 @@ export class OpenCodeManager { const streamToolCalls = streamAccumulator.toolCalls; accumulatedText += roundText; - // Emit token usage after stream completes - if (callbacks.onTokenUsage) { + // Emit token usage after stream completes (only when usage data was received) + const hasUsageData = promptTokens > 0 || completionTokens > 0; + if (callbacks.onTokenUsage && hasUsageData) { const inputTokens = promptTokens - cacheReadTokens; const outputTokens = completionTokens; @@ -843,12 +862,13 @@ export class OpenCodeManager { } // Collect tool calls from stream accumulator - const parsedToolCalls: Array<{ id: string; name: string; args: unknown }> = []; + const parsedToolCalls: Array<{ id: string; name: string; args: unknown; parseError?: string }> = []; for (const [, tc] of streamToolCalls) { try { parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) }); - } catch { - parsedToolCalls.push({ id: tc.id, name: tc.name, args: {} }); + } catch (e) { + console.error(`[OpenCodeManager:OpenAI] Failed to parse tool arguments for ${tc.name}:`, tc.arguments); + parsedToolCalls.push({ id: tc.id, name: tc.name, args: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` }); } } @@ -882,6 +902,20 @@ export class OpenCodeManager { callbacks.onToolCall({ name: toolName, args: toolArgs }); } + // If JSON parsing of tool arguments failed, report the error to the model + if (toolCall.parseError) { + const errorResult = { error: true, message: toolCall.parseError }; + if (callbacks.onToolResult) { + callbacks.onToolResult({ name: toolName, result: errorResult }); + } + messages.push({ + role: 'tool', + content: JSON.stringify(errorResult), + tool_call_id: toolCall.id, + }); + continue; + } + // Check if this is a render tool if (isRenderTool(toolName)) { const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs as Record); diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 9cb50a9..8f7618b 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -529,6 +529,13 @@ export function httpRequestStream( rejectNext = reject; }); }, + return(): Promise> { + // Called when for-await-of exits early (break, return, throw). + // Destroy the response stream to free the socket immediately. + done = true; + res.destroy(); + return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true }); + }, }; }, }; @@ -559,7 +566,7 @@ export function httpRequestStream( } options.signal.addEventListener('abort', () => { req.destroy(); - }); + }, { once: true }); } if (options.body) { diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index 3adcb6d..e839bbc 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -1221,3 +1221,73 @@ describe('SSE spec compliance - single space removal', () => { expect(events[0].event).toBe('ping'); }); }); + +// ── Async iterator return() cleanup ── + +describe('async iterator return() cleanup', () => { + function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise }> { + return new Promise((resolve) => { + const server = http.createServer(handler); + server.listen(0, () => { + const addr = server.address() as { port: number }; + resolve({ + url: `http://localhost:${addr.port}`, + close: () => new Promise((r) => server.close(() => r())), + }); + }); + }); + } + + it('destroys response stream when for-await-of breaks early', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"A"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":"B"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":"C"}}]}\n\n'); + // Don't end the response — the client should destroy it via return() + // Keep connection alive for a bit + setTimeout(() => res.end(), 5000); + }); + + try { + const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' }); + const collected: SSEEvent[] = []; + for await (const event of events) { + collected.push(event); + if (collected.length === 1) break; // Early exit triggers return() + } + + expect(collected).toHaveLength(1); + expect(collected[0].data).toBe('{"choices":[{"delta":{"content":"A"}}]}'); + } finally { + await srv.close(); + } + }); + + it('return() method signals done and is idempotent', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"ok":true}\n\n'); + setTimeout(() => res.end(), 5000); + }); + + try { + const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' }); + const iter = events[Symbol.asyncIterator](); + + // Consume one event + const first = await iter.next(); + expect(first.done).toBe(false); + + // Call return() explicitly + const returnResult = await iter.return!(undefined as unknown as SSEEvent); + expect(returnResult.done).toBe(true); + + // Subsequent next() should return done + const after = await iter.next(); + expect(after.done).toBe(true); + } finally { + await srv.close(); + } + }); +});