From 72410b2973292af0f36c17389c493a1f4ff7081f Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 12:02:57 +0100 Subject: [PATCH] fix: thinking block support, roundText preservation, abort listener leak, typo --- AGENTS.md | 2 +- src/main/engine/OpenCodeManager.ts | 90 ++++++++++++++---------- src/main/engine/streaming.ts | 14 +++- tests/engine/streaming.test.ts | 108 +++++++++++++++++++++++++++++ 4 files changed, 173 insertions(+), 41 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 4e35d4a..9a4525e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -10,7 +10,7 @@ This document provides context and best practices for GitHub Copilot when workin ## Commits - commit messages are short - one sentence. do not write long articles. -- pull requests are more verbose and especialy give reasoning for changes +- pull requests are more verbose and especially give reasoning for changes --- diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 3b3e638..7d1bfd1 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -509,32 +509,37 @@ export class OpenCodeManager { let cacheWriteTokens = 0; let roundText = ''; - for await (const event of events) { - const result = parseAnthropicStreamEvent(event, streamAccumulator); + try { + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, streamAccumulator); - if (result.textDelta) { - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } } - } - if (result.usage) { - if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; - if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; - } + if (result.usage) { + if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; + if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; + } - if (result.finishReason) { - stopReason = result.finishReason; - } + if (result.finishReason) { + stopReason = result.finishReason; + } - if (result.done) break; + if (result.done) break; + } + } finally { + // Preserve text already emitted via onDelta even if the stream errors mid-round + accumulatedText += roundText; } const streamToolCalls = streamAccumulator.toolCalls; - accumulatedText += roundText; + const streamThinkingBlocks = streamAccumulator.thinkingBlocks; // Emit token usage after stream completes (only when usage data was received) const hasUsageData = inputTokens > 0 || outputTokens > 0; @@ -587,6 +592,13 @@ export class OpenCodeManager { // Build assistant content blocks for the next message round const assistantContentBlocks: AnthropicContentBlock[] = []; + // Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled) + for (const [, tb] of streamThinkingBlocks) { + if (tb.text) { + assistantContentBlocks.push({ type: 'thinking', text: tb.text }); + } + } + // Add text block with text from this round if (roundText) { assistantContentBlocks.push({ type: 'text', text: roundText }); @@ -805,32 +817,36 @@ export class OpenCodeManager { let cacheReadTokens = 0; let roundText = ''; - for await (const event of events) { - const result = parseOpenAIStreamEvent(event, streamAccumulator); + try { + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, streamAccumulator); - if (result.textDelta) { - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } } - } - if (result.usage) { - if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; - if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; - if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - } + if (result.usage) { + if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; + if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; + if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + } - if (result.finishReason) { - finishReason = result.finishReason; - } + if (result.finishReason) { + finishReason = result.finishReason; + } - if (result.done) break; + if (result.done) break; + } + } finally { + // Preserve text already emitted via onDelta even if the stream errors mid-round + accumulatedText += roundText; } const streamToolCalls = streamAccumulator.toolCalls; - accumulatedText += roundText; // Emit token usage after stream completes (only when usage data was received) const hasUsageData = promptTokens > 0 || completionTokens > 0; @@ -2347,7 +2363,7 @@ Respond with JSON only: {"title": "...", "alt": "...", "caption": "..."}`; options.signal.addEventListener('abort', () => { req.destroy(); reject(new Error('Request cancelled')); - }); + }, { once: true }); } if (options.body) { diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 8f7618b..8aec14e 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -50,6 +50,7 @@ export interface OpenAIStreamAccumulator { export interface AnthropicStreamAccumulator { toolCalls: Map; + thinkingBlocks: Map; } export interface HttpStreamError extends Error { @@ -119,7 +120,7 @@ export function createOpenAIStreamAccumulator(): OpenAIStreamAccumulator { } export function createAnthropicStreamAccumulator(): AnthropicStreamAccumulator { - return { toolCalls: new Map() }; + return { toolCalls: new Map(), thinkingBlocks: new Map() }; } // ── OpenAI/Mistral SSE Parser ── @@ -217,8 +218,8 @@ export function parseOpenAIStreamEvent( * * Anthropic streaming format uses named event types: * - message_start: input token usage - * - content_block_start: text or tool_use block begins - * - content_block_delta: text_delta or input_json_delta + * - content_block_start: text, tool_use, or thinking block begins + * - content_block_delta: text_delta, input_json_delta, or thinking_delta * - content_block_stop: block ends * - message_delta: output tokens + stop_reason * - message_stop: stream complete @@ -260,6 +261,8 @@ export function parseAnthropicStreamEvent( name: block.name, arguments: '', }); + } else if (block?.type === 'thinking') { + accumulator.thinkingBlocks.set(data.index as number, { text: '' }); } // text block start is a no-op (empty initial text) break; @@ -274,6 +277,11 @@ export function parseAnthropicStreamEvent( if (tc) { tc.arguments += delta.partial_json; } + } else if (delta?.type === 'thinking_delta' && delta.thinking) { + const tb = accumulator.thinkingBlocks.get(data.index as number); + if (tb) { + tb.text += delta.thinking; + } } break; } diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index e839bbc..3617ece 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -449,6 +449,114 @@ describe('parseAnthropicStreamEvent', () => { const result = parseAnthropicStreamEvent(event, accumulator); expect(result.finishReason).toBe('tool_use'); }); + + it('handles thinking content_block_start', () => { + const event: SSEEvent = { + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }; + const result = parseAnthropicStreamEvent(event, accumulator); + expect(result.textDelta).toBeUndefined(); + expect(accumulator.thinkingBlocks.get(0)).toEqual({ text: '' }); + }); + + it('accumulates thinking_delta fragments', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // First thinking fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Let me think' }, + }), + }, accumulator); + + // Second thinking fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: ' about this...' }, + }), + }, accumulator); + + expect(accumulator.thinkingBlocks.get(0)?.text).toBe('Let me think about this...'); + }); + + it('does not emit thinking_delta as textDelta', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + const result = parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Internal reasoning' }, + }), + }, accumulator); + + // thinking_delta must NOT leak to textDelta — it's internal model reasoning + expect(result.textDelta).toBeUndefined(); + }); + + it('accumulates thinking and text blocks independently', () => { + // Thinking block at index 0 + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Reasoning...' }, + }), + }, accumulator); + + // Text block at index 1 + const textResult = parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 1, + delta: { type: 'text_delta', text: 'Here is my answer' }, + }), + }, accumulator); + + // Thinking accumulated separately + expect(accumulator.thinkingBlocks.get(0)?.text).toBe('Reasoning...'); + // Text still emitted as textDelta + expect(textResult.textDelta).toBe('Here is my answer'); + }); }); // ── Tool Call Accumulation ──