fix: iterator cleanup, abort listener leak, token guard, tool parse errors

This commit is contained in:
2026-03-01 11:51:36 +01:00
parent 5ea55f407d
commit 90f541cc84
3 changed files with 122 additions and 11 deletions

View File

@@ -137,6 +137,7 @@ interface AnthropicContentBlock {
input?: unknown; input?: unknown;
tool_use_id?: string; tool_use_id?: string;
content?: string | AnthropicToolResultContent[]; content?: string | AnthropicToolResultContent[];
is_error?: boolean;
source?: { source?: {
type: 'base64'; type: 'base64';
media_type: string; media_type: string;
@@ -535,8 +536,9 @@ export class OpenCodeManager {
const streamToolCalls = streamAccumulator.toolCalls; const streamToolCalls = streamAccumulator.toolCalls;
accumulatedText += roundText; accumulatedText += roundText;
// Emit token usage after stream completes // Emit token usage after stream completes (only when usage data was received)
if (callbacks.onTokenUsage) { const hasUsageData = inputTokens > 0 || outputTokens > 0;
if (callbacks.onTokenUsage && hasUsageData) {
const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens; const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens;
const totalTokens = inputTokens + outputTokens; const totalTokens = inputTokens + outputTokens;
@@ -562,12 +564,13 @@ export class OpenCodeManager {
} }
// Collect tool calls from stream accumulator // Collect tool calls from stream accumulator
const toolUseBlocks: Array<{ id: string; name: string; input: unknown }> = []; const toolUseBlocks: Array<{ id: string; name: string; input: unknown; parseError?: string }> = [];
for (const [, tc] of streamToolCalls) { for (const [, tc] of streamToolCalls) {
try { try {
toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) }); toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) });
} catch { } catch (e) {
toolUseBlocks.push({ id: tc.id, name: tc.name, input: {} }); console.error(`[OpenCodeManager] Failed to parse tool arguments for ${tc.name}:`, tc.arguments);
toolUseBlocks.push({ id: tc.id, name: tc.name, input: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` });
} }
} }
@@ -608,6 +611,21 @@ export class OpenCodeManager {
callbacks.onToolCall({ name: toolName, args: toolArgs }); callbacks.onToolCall({ name: toolName, args: toolArgs });
} }
// If JSON parsing of tool arguments failed, report the error to the model
if (toolBlock.parseError) {
const errorResult = { error: true, message: toolBlock.parseError };
if (callbacks.onToolResult) {
callbacks.onToolResult({ name: toolName, result: errorResult });
}
toolResults.push({
type: 'tool_result',
tool_use_id: toolUseId,
content: JSON.stringify(errorResult),
is_error: true,
});
continue;
}
// Check if this is a render tool — generate A2UI messages instead of executing // Check if this is a render tool — generate A2UI messages instead of executing
if (isRenderTool(toolName)) { if (isRenderTool(toolName)) {
const a2uiMessages = generateFromToolCall( const a2uiMessages = generateFromToolCall(
@@ -814,8 +832,9 @@ export class OpenCodeManager {
const streamToolCalls = streamAccumulator.toolCalls; const streamToolCalls = streamAccumulator.toolCalls;
accumulatedText += roundText; accumulatedText += roundText;
// Emit token usage after stream completes // Emit token usage after stream completes (only when usage data was received)
if (callbacks.onTokenUsage) { const hasUsageData = promptTokens > 0 || completionTokens > 0;
if (callbacks.onTokenUsage && hasUsageData) {
const inputTokens = promptTokens - cacheReadTokens; const inputTokens = promptTokens - cacheReadTokens;
const outputTokens = completionTokens; const outputTokens = completionTokens;
@@ -843,12 +862,13 @@ export class OpenCodeManager {
} }
// Collect tool calls from stream accumulator // Collect tool calls from stream accumulator
const parsedToolCalls: Array<{ id: string; name: string; args: unknown }> = []; const parsedToolCalls: Array<{ id: string; name: string; args: unknown; parseError?: string }> = [];
for (const [, tc] of streamToolCalls) { for (const [, tc] of streamToolCalls) {
try { try {
parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) }); parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) });
} catch { } catch (e) {
parsedToolCalls.push({ id: tc.id, name: tc.name, args: {} }); console.error(`[OpenCodeManager:OpenAI] Failed to parse tool arguments for ${tc.name}:`, tc.arguments);
parsedToolCalls.push({ id: tc.id, name: tc.name, args: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` });
} }
} }
@@ -882,6 +902,20 @@ export class OpenCodeManager {
callbacks.onToolCall({ name: toolName, args: toolArgs }); callbacks.onToolCall({ name: toolName, args: toolArgs });
} }
// If JSON parsing of tool arguments failed, report the error to the model
if (toolCall.parseError) {
const errorResult = { error: true, message: toolCall.parseError };
if (callbacks.onToolResult) {
callbacks.onToolResult({ name: toolName, result: errorResult });
}
messages.push({
role: 'tool',
content: JSON.stringify(errorResult),
tool_call_id: toolCall.id,
});
continue;
}
// Check if this is a render tool // Check if this is a render tool
if (isRenderTool(toolName)) { if (isRenderTool(toolName)) {
const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs as Record<string, unknown>); const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs as Record<string, unknown>);

View File

@@ -529,6 +529,13 @@ export function httpRequestStream(
rejectNext = reject; rejectNext = reject;
}); });
}, },
return(): Promise<IteratorResult<SSEEvent>> {
// Called when for-await-of exits early (break, return, throw).
// Destroy the response stream to free the socket immediately.
done = true;
res.destroy();
return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true });
},
}; };
}, },
}; };
@@ -559,7 +566,7 @@ export function httpRequestStream(
} }
options.signal.addEventListener('abort', () => { options.signal.addEventListener('abort', () => {
req.destroy(); req.destroy();
}); }, { once: true });
} }
if (options.body) { if (options.body) {

View File

@@ -1221,3 +1221,73 @@ describe('SSE spec compliance - single space removal', () => {
expect(events[0].event).toBe('ping'); expect(events[0].event).toBe('ping');
}); });
}); });
// ── Async iterator return() cleanup ──
describe('async iterator return() cleanup', () => {
function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise<void> }> {
return new Promise((resolve) => {
const server = http.createServer(handler);
server.listen(0, () => {
const addr = server.address() as { port: number };
resolve({
url: `http://localhost:${addr.port}`,
close: () => new Promise<void>((r) => server.close(() => r())),
});
});
});
}
it('destroys response stream when for-await-of breaks early', async () => {
const srv = await startTestServer((_req, res) => {
res.writeHead(200, { 'Content-Type': 'text/event-stream' });
res.write('data: {"choices":[{"delta":{"content":"A"}}]}\n\n');
res.write('data: {"choices":[{"delta":{"content":"B"}}]}\n\n');
res.write('data: {"choices":[{"delta":{"content":"C"}}]}\n\n');
// Don't end the response — the client should destroy it via return()
// Keep connection alive for a bit
setTimeout(() => res.end(), 5000);
});
try {
const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' });
const collected: SSEEvent[] = [];
for await (const event of events) {
collected.push(event);
if (collected.length === 1) break; // Early exit triggers return()
}
expect(collected).toHaveLength(1);
expect(collected[0].data).toBe('{"choices":[{"delta":{"content":"A"}}]}');
} finally {
await srv.close();
}
});
it('return() method signals done and is idempotent', async () => {
const srv = await startTestServer((_req, res) => {
res.writeHead(200, { 'Content-Type': 'text/event-stream' });
res.write('data: {"ok":true}\n\n');
setTimeout(() => res.end(), 5000);
});
try {
const { events } = await httpRequestStream(srv.url, { method: 'POST', body: '{}' });
const iter = events[Symbol.asyncIterator]();
// Consume one event
const first = await iter.next();
expect(first.done).toBe(false);
// Call return() explicitly
const returnResult = await iter.return!(undefined as unknown as SSEEvent);
expect(returnResult.done).toBe(true);
// Subsequent next() should return done
const after = await iter.next();
expect(after.done).toBe(true);
} finally {
await srv.close();
}
});
});