From 2ddaad422f634e1ce914ad7cfa4d1777dbfecb6d Mon Sep 17 00:00:00 2001 From: hugo Date: Sun, 1 Mar 2026 12:13:14 +0100 Subject: [PATCH] fix: thinking signature capture, abort-aware retry delay, usage tracking --- src/main/engine/OpenCodeManager.ts | 17 ++- src/main/engine/streaming.ts | 43 ++++++- tests/engine/streaming.test.ts | 197 +++++++++++++++++++++++++++++ 3 files changed, 248 insertions(+), 9 deletions(-) diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index 7d1bfd1..e24b0c5 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -138,6 +138,7 @@ interface AnthropicContentBlock { tool_use_id?: string; content?: string | AnthropicToolResultContent[]; is_error?: boolean; + signature?: string; source?: { type: 'base64'; media_type: string; @@ -508,6 +509,7 @@ export class OpenCodeManager { let cacheReadTokens = 0; let cacheWriteTokens = 0; let roundText = ''; + let receivedUsage = false; try { for await (const event of events) { @@ -521,6 +523,7 @@ export class OpenCodeManager { } if (result.usage) { + receivedUsage = true; if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; @@ -542,8 +545,7 @@ export class OpenCodeManager { const streamThinkingBlocks = streamAccumulator.thinkingBlocks; // Emit token usage after stream completes (only when usage data was received) - const hasUsageData = inputTokens > 0 || outputTokens > 0; - if (callbacks.onTokenUsage && hasUsageData) { + if (callbacks.onTokenUsage && receivedUsage) { const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens; const totalTokens = inputTokens + outputTokens; @@ -595,7 +597,11 @@ export class OpenCodeManager { // Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled) for (const [, tb] of streamThinkingBlocks) { if (tb.text) { - assistantContentBlocks.push({ type: 'thinking', text: tb.text }); + const thinkingBlock: AnthropicContentBlock = { type: 'thinking', text: tb.text }; + if (tb.signature) { + thinkingBlock.signature = tb.signature; + } + assistantContentBlocks.push(thinkingBlock); } } @@ -816,6 +822,7 @@ export class OpenCodeManager { let totalTokens = 0; let cacheReadTokens = 0; let roundText = ''; + let receivedUsage = false; try { for await (const event of events) { @@ -829,6 +836,7 @@ export class OpenCodeManager { } if (result.usage) { + receivedUsage = true; if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; @@ -849,8 +857,7 @@ export class OpenCodeManager { const streamToolCalls = streamAccumulator.toolCalls; // Emit token usage after stream completes (only when usage data was received) - const hasUsageData = promptTokens > 0 || completionTokens > 0; - if (callbacks.onTokenUsage && hasUsageData) { + if (callbacks.onTokenUsage && receivedUsage) { const inputTokens = promptTokens - cacheReadTokens; const outputTokens = completionTokens; diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 8aec14e..5f40ac8 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -50,7 +50,7 @@ export interface OpenAIStreamAccumulator { export interface AnthropicStreamAccumulator { toolCalls: Map; - thinkingBlocks: Map; + thinkingBlocks: Map; } export interface HttpStreamError extends Error { @@ -286,9 +286,18 @@ export function parseAnthropicStreamEvent( break; } - case 'content_block_stop': + case 'content_block_stop': { // Block is complete. Tool arguments can now be parsed by the caller. + // For thinking blocks, capture the signature (required by Anthropic when replaying thinking blocks). + const stopBlock = (data as any).content_block; + if (stopBlock?.type === 'thinking' && stopBlock.signature) { + const tb = accumulator.thinkingBlocks.get(data.index as number); + if (tb) { + tb.signature = stopBlock.signature; + } + } break; + } case 'message_delta': { if ((data as any).usage) { @@ -343,7 +352,7 @@ const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]); */ export async function withRetry( fn: () => Promise, - options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void } = {}, + options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void; signal?: AbortSignal } = {}, ): Promise { const maxRetries = options.maxRetries ?? 3; let lastError: Error | undefined; @@ -360,6 +369,13 @@ export async function withRetry( throw error; } + // Check signal before retrying + if (options.signal?.aborted) { + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + throw abortError; + } + // Don't retry on non-retryable status codes if (httpError.statusCode && !RETRYABLE_STATUS_CODES.has(httpError.statusCode)) { throw error; @@ -384,7 +400,26 @@ export async function withRetry( options.onRetry(attempt + 1, lastError); } - await new Promise(resolve => setTimeout(resolve, delay)); + // Abort-aware delay: reject immediately if signal fires during wait + await new Promise((resolve, reject) => { + const timer = setTimeout(resolve, delay); + if (options.signal) { + const onAbort = () => { + clearTimeout(timer); + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + reject(abortError); + }; + if (options.signal.aborted) { + clearTimeout(timer); + const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError; + abortError.isAbort = true; + reject(abortError); + return; + } + options.signal.addEventListener('abort', onAbort, { once: true }); + } + }); } } diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index 3617ece..77e5358 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -1399,3 +1399,200 @@ describe('async iterator return() cleanup', () => { } }); }); + +// ── Thinking block signature capture ── + +describe('Anthropic thinking block signature', () => { + let accumulator: AnthropicStreamAccumulator; + + beforeEach(() => { + accumulator = createAnthropicStreamAccumulator(); + }); + + it('captures signature from content_block_stop for thinking blocks', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // Accumulate thinking + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'thinking_delta', thinking: 'Let me reason...' }, + }), + }, accumulator); + + // content_block_stop with signature + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + content_block: { + type: 'thinking', + thinking: 'Let me reason...', + signature: 'ErUBCkYIAxgCIkD+ybfICm10kSig...', + }, + }), + }, accumulator); + + const tb = accumulator.thinkingBlocks.get(0); + expect(tb).toBeDefined(); + expect(tb!.text).toBe('Let me reason...'); + expect(tb!.signature).toBe('ErUBCkYIAxgCIkD+ybfICm10kSig...'); + }); + + it('leaves signature undefined when content_block_stop has no signature', () => { + // Start thinking block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'thinking', thinking: '' }, + }), + }, accumulator); + + // content_block_stop without signature + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + }), + }, accumulator); + + const tb = accumulator.thinkingBlocks.get(0); + expect(tb).toBeDefined(); + expect(tb!.signature).toBeUndefined(); + }); + + it('does not affect tool_call blocks on content_block_stop', () => { + // Start tool_use block + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ + type: 'content_block_start', + index: 0, + content_block: { type: 'tool_use', id: 'toolu_1', name: 'search_posts' }, + }), + }, accumulator); + + // Tool argument fragment + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ + type: 'content_block_delta', + index: 0, + delta: { type: 'input_json_delta', partial_json: '{"query":"test"}' }, + }), + }, accumulator); + + // content_block_stop (no signature for tool blocks) + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ + type: 'content_block_stop', + index: 0, + }), + }, accumulator); + + // Tool call should be unaffected + const tc = accumulator.toolCalls.get(0); + expect(tc).toBeDefined(); + expect(tc!.arguments).toBe('{"query":"test"}'); + }); + + it('full thinking sequence produces signature on accumulator', () => { + // Full realistic sequence: thinking block -> text block -> tool_use + // Thinking at index 0 + parseAnthropicStreamEvent({ + event: 'content_block_start', + data: JSON.stringify({ type: 'content_block_start', index: 0, content_block: { type: 'thinking', thinking: '' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'Step 1. ' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_delta', + data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'Step 2.' } }), + }, accumulator); + parseAnthropicStreamEvent({ + event: 'content_block_stop', + data: JSON.stringify({ type: 'content_block_stop', index: 0, content_block: { type: 'thinking', thinking: 'Step 1. Step 2.', signature: 'sig_abc123' } }), + }, accumulator); + + expect(accumulator.thinkingBlocks.get(0)).toEqual({ text: 'Step 1. Step 2.', signature: 'sig_abc123' }); + }); +}); + +// ── withRetry abort-aware delay ── + +describe('withRetry abort during delay', () => { + it('rejects quickly when signal is aborted during retry delay', async () => { + const controller = new AbortController(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn().mockRejectedValue(error429); + + const promise = withRetry(fn, { + maxRetries: 3, + signal: controller.signal, + }); + + // First attempt fails immediately, then enters retry delay. + // Wait a small amount for the first attempt to fail and delay to start. + await new Promise(r => setTimeout(r, 50)); + expect(fn).toHaveBeenCalledTimes(1); + + // Abort during the delay + controller.abort(); + + // Should reject with abort error, not wait for delay to finish + await expect(promise).rejects.toThrow(); + // Should NOT have made a second attempt — aborted during delay + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('does not abort delay when no signal is provided', async () => { + vi.useFakeTimers(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { maxRetries: 3 }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + vi.useRealTimers(); + }); + + it('works normally when signal is not aborted', async () => { + vi.useFakeTimers(); + const controller = new AbortController(); + const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 }); + const fn = vi.fn() + .mockRejectedValueOnce(error429) + .mockResolvedValue('ok'); + + const promise = withRetry(fn, { + maxRetries: 3, + signal: controller.signal, + }); + await vi.advanceTimersByTimeAsync(2000); + const result = await promise; + expect(result).toBe('ok'); + expect(fn).toHaveBeenCalledTimes(2); + vi.useRealTimers(); + }); +});