fix: thinking block support, roundText preservation, abort listener leak, typo

This commit is contained in:
2026-03-01 12:02:57 +01:00
parent 1d2eba8114
commit 72410b2973
4 changed files with 173 additions and 41 deletions

View File

@@ -10,7 +10,7 @@ This document provides context and best practices for GitHub Copilot when workin
## Commits
- commit messages are short - one sentence. do not write long articles.
- pull requests are more verbose and especialy give reasoning for changes
- pull requests are more verbose and especially give reasoning for changes
---

View File

@@ -509,32 +509,37 @@ export class OpenCodeManager {
let cacheWriteTokens = 0;
let roundText = '';
for await (const event of events) {
const result = parseAnthropicStreamEvent(event, streamAccumulator);
try {
for await (const event of events) {
const result = parseAnthropicStreamEvent(event, streamAccumulator);
if (result.textDelta) {
roundText += result.textDelta;
if (callbacks.onDelta) {
callbacks.onDelta(result.textDelta);
if (result.textDelta) {
roundText += result.textDelta;
if (callbacks.onDelta) {
callbacks.onDelta(result.textDelta);
}
}
}
if (result.usage) {
if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens;
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens;
if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens;
}
if (result.usage) {
if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens;
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens;
if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens;
}
if (result.finishReason) {
stopReason = result.finishReason;
}
if (result.finishReason) {
stopReason = result.finishReason;
}
if (result.done) break;
if (result.done) break;
}
} finally {
// Preserve text already emitted via onDelta even if the stream errors mid-round
accumulatedText += roundText;
}
const streamToolCalls = streamAccumulator.toolCalls;
accumulatedText += roundText;
const streamThinkingBlocks = streamAccumulator.thinkingBlocks;
// Emit token usage after stream completes (only when usage data was received)
const hasUsageData = inputTokens > 0 || outputTokens > 0;
@@ -587,6 +592,13 @@ export class OpenCodeManager {
// Build assistant content blocks for the next message round
const assistantContentBlocks: AnthropicContentBlock[] = [];
// Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled)
for (const [, tb] of streamThinkingBlocks) {
if (tb.text) {
assistantContentBlocks.push({ type: 'thinking', text: tb.text });
}
}
// Add text block with text from this round
if (roundText) {
assistantContentBlocks.push({ type: 'text', text: roundText });
@@ -805,32 +817,36 @@ export class OpenCodeManager {
let cacheReadTokens = 0;
let roundText = '';
for await (const event of events) {
const result = parseOpenAIStreamEvent(event, streamAccumulator);
try {
for await (const event of events) {
const result = parseOpenAIStreamEvent(event, streamAccumulator);
if (result.textDelta) {
roundText += result.textDelta;
if (callbacks.onDelta) {
callbacks.onDelta(result.textDelta);
if (result.textDelta) {
roundText += result.textDelta;
if (callbacks.onDelta) {
callbacks.onDelta(result.textDelta);
}
}
}
if (result.usage) {
if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens;
if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens;
if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens;
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
}
if (result.usage) {
if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens;
if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens;
if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens;
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
}
if (result.finishReason) {
finishReason = result.finishReason;
}
if (result.finishReason) {
finishReason = result.finishReason;
}
if (result.done) break;
if (result.done) break;
}
} finally {
// Preserve text already emitted via onDelta even if the stream errors mid-round
accumulatedText += roundText;
}
const streamToolCalls = streamAccumulator.toolCalls;
accumulatedText += roundText;
// Emit token usage after stream completes (only when usage data was received)
const hasUsageData = promptTokens > 0 || completionTokens > 0;
@@ -2347,7 +2363,7 @@ Respond with JSON only: {"title": "...", "alt": "...", "caption": "..."}`;
options.signal.addEventListener('abort', () => {
req.destroy();
reject(new Error('Request cancelled'));
});
}, { once: true });
}
if (options.body) {

View File

@@ -50,6 +50,7 @@ export interface OpenAIStreamAccumulator {
export interface AnthropicStreamAccumulator {
toolCalls: Map<number, ToolCallAccumulator>;
thinkingBlocks: Map<number, { text: string }>;
}
export interface HttpStreamError extends Error {
@@ -119,7 +120,7 @@ export function createOpenAIStreamAccumulator(): OpenAIStreamAccumulator {
}
export function createAnthropicStreamAccumulator(): AnthropicStreamAccumulator {
return { toolCalls: new Map() };
return { toolCalls: new Map(), thinkingBlocks: new Map() };
}
// ── OpenAI/Mistral SSE Parser ──
@@ -217,8 +218,8 @@ export function parseOpenAIStreamEvent(
*
* Anthropic streaming format uses named event types:
* - message_start: input token usage
* - content_block_start: text or tool_use block begins
* - content_block_delta: text_delta or input_json_delta
* - content_block_start: text, tool_use, or thinking block begins
* - content_block_delta: text_delta, input_json_delta, or thinking_delta
* - content_block_stop: block ends
* - message_delta: output tokens + stop_reason
* - message_stop: stream complete
@@ -260,6 +261,8 @@ export function parseAnthropicStreamEvent(
name: block.name,
arguments: '',
});
} else if (block?.type === 'thinking') {
accumulator.thinkingBlocks.set(data.index as number, { text: '' });
}
// text block start is a no-op (empty initial text)
break;
@@ -274,6 +277,11 @@ export function parseAnthropicStreamEvent(
if (tc) {
tc.arguments += delta.partial_json;
}
} else if (delta?.type === 'thinking_delta' && delta.thinking) {
const tb = accumulator.thinkingBlocks.get(data.index as number);
if (tb) {
tb.text += delta.thinking;
}
}
break;
}

View File

@@ -449,6 +449,114 @@ describe('parseAnthropicStreamEvent', () => {
const result = parseAnthropicStreamEvent(event, accumulator);
expect(result.finishReason).toBe('tool_use');
});
it('handles thinking content_block_start', () => {
const event: SSEEvent = {
event: 'content_block_start',
data: JSON.stringify({
type: 'content_block_start',
index: 0,
content_block: { type: 'thinking', thinking: '' },
}),
};
const result = parseAnthropicStreamEvent(event, accumulator);
expect(result.textDelta).toBeUndefined();
expect(accumulator.thinkingBlocks.get(0)).toEqual({ text: '' });
});
it('accumulates thinking_delta fragments', () => {
// Start thinking block
parseAnthropicStreamEvent({
event: 'content_block_start',
data: JSON.stringify({
type: 'content_block_start',
index: 0,
content_block: { type: 'thinking', thinking: '' },
}),
}, accumulator);
// First thinking fragment
parseAnthropicStreamEvent({
event: 'content_block_delta',
data: JSON.stringify({
type: 'content_block_delta',
index: 0,
delta: { type: 'thinking_delta', thinking: 'Let me think' },
}),
}, accumulator);
// Second thinking fragment
parseAnthropicStreamEvent({
event: 'content_block_delta',
data: JSON.stringify({
type: 'content_block_delta',
index: 0,
delta: { type: 'thinking_delta', thinking: ' about this...' },
}),
}, accumulator);
expect(accumulator.thinkingBlocks.get(0)?.text).toBe('Let me think about this...');
});
it('does not emit thinking_delta as textDelta', () => {
// Start thinking block
parseAnthropicStreamEvent({
event: 'content_block_start',
data: JSON.stringify({
type: 'content_block_start',
index: 0,
content_block: { type: 'thinking', thinking: '' },
}),
}, accumulator);
const result = parseAnthropicStreamEvent({
event: 'content_block_delta',
data: JSON.stringify({
type: 'content_block_delta',
index: 0,
delta: { type: 'thinking_delta', thinking: 'Internal reasoning' },
}),
}, accumulator);
// thinking_delta must NOT leak to textDelta — it's internal model reasoning
expect(result.textDelta).toBeUndefined();
});
it('accumulates thinking and text blocks independently', () => {
// Thinking block at index 0
parseAnthropicStreamEvent({
event: 'content_block_start',
data: JSON.stringify({
type: 'content_block_start',
index: 0,
content_block: { type: 'thinking', thinking: '' },
}),
}, accumulator);
parseAnthropicStreamEvent({
event: 'content_block_delta',
data: JSON.stringify({
type: 'content_block_delta',
index: 0,
delta: { type: 'thinking_delta', thinking: 'Reasoning...' },
}),
}, accumulator);
// Text block at index 1
const textResult = parseAnthropicStreamEvent({
event: 'content_block_delta',
data: JSON.stringify({
type: 'content_block_delta',
index: 1,
delta: { type: 'text_delta', text: 'Here is my answer' },
}),
}, accumulator);
// Thinking accumulated separately
expect(accumulator.thinkingBlocks.get(0)?.text).toBe('Reasoning...');
// Text still emitted as textDelta
expect(textResult.textDelta).toBe('Here is my answer');
});
});
// ── Tool Call Accumulation ──