fix: thinking signature capture, abort-aware retry delay, usage tracking
This commit is contained in:
@@ -138,6 +138,7 @@ interface AnthropicContentBlock {
|
|||||||
tool_use_id?: string;
|
tool_use_id?: string;
|
||||||
content?: string | AnthropicToolResultContent[];
|
content?: string | AnthropicToolResultContent[];
|
||||||
is_error?: boolean;
|
is_error?: boolean;
|
||||||
|
signature?: string;
|
||||||
source?: {
|
source?: {
|
||||||
type: 'base64';
|
type: 'base64';
|
||||||
media_type: string;
|
media_type: string;
|
||||||
@@ -508,6 +509,7 @@ export class OpenCodeManager {
|
|||||||
let cacheReadTokens = 0;
|
let cacheReadTokens = 0;
|
||||||
let cacheWriteTokens = 0;
|
let cacheWriteTokens = 0;
|
||||||
let roundText = '';
|
let roundText = '';
|
||||||
|
let receivedUsage = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for await (const event of events) {
|
for await (const event of events) {
|
||||||
@@ -521,6 +523,7 @@ export class OpenCodeManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (result.usage) {
|
if (result.usage) {
|
||||||
|
receivedUsage = true;
|
||||||
if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens;
|
if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens;
|
||||||
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
|
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
|
||||||
if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens;
|
if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens;
|
||||||
@@ -542,8 +545,7 @@ export class OpenCodeManager {
|
|||||||
const streamThinkingBlocks = streamAccumulator.thinkingBlocks;
|
const streamThinkingBlocks = streamAccumulator.thinkingBlocks;
|
||||||
|
|
||||||
// Emit token usage after stream completes (only when usage data was received)
|
// Emit token usage after stream completes (only when usage data was received)
|
||||||
const hasUsageData = inputTokens > 0 || outputTokens > 0;
|
if (callbacks.onTokenUsage && receivedUsage) {
|
||||||
if (callbacks.onTokenUsage && hasUsageData) {
|
|
||||||
const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens;
|
const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens;
|
||||||
const totalTokens = inputTokens + outputTokens;
|
const totalTokens = inputTokens + outputTokens;
|
||||||
|
|
||||||
@@ -595,7 +597,11 @@ export class OpenCodeManager {
|
|||||||
// Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled)
|
// Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled)
|
||||||
for (const [, tb] of streamThinkingBlocks) {
|
for (const [, tb] of streamThinkingBlocks) {
|
||||||
if (tb.text) {
|
if (tb.text) {
|
||||||
assistantContentBlocks.push({ type: 'thinking', text: tb.text });
|
const thinkingBlock: AnthropicContentBlock = { type: 'thinking', text: tb.text };
|
||||||
|
if (tb.signature) {
|
||||||
|
thinkingBlock.signature = tb.signature;
|
||||||
|
}
|
||||||
|
assistantContentBlocks.push(thinkingBlock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -816,6 +822,7 @@ export class OpenCodeManager {
|
|||||||
let totalTokens = 0;
|
let totalTokens = 0;
|
||||||
let cacheReadTokens = 0;
|
let cacheReadTokens = 0;
|
||||||
let roundText = '';
|
let roundText = '';
|
||||||
|
let receivedUsage = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for await (const event of events) {
|
for await (const event of events) {
|
||||||
@@ -829,6 +836,7 @@ export class OpenCodeManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (result.usage) {
|
if (result.usage) {
|
||||||
|
receivedUsage = true;
|
||||||
if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens;
|
if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens;
|
||||||
if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens;
|
if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens;
|
||||||
if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens;
|
if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens;
|
||||||
@@ -849,8 +857,7 @@ export class OpenCodeManager {
|
|||||||
const streamToolCalls = streamAccumulator.toolCalls;
|
const streamToolCalls = streamAccumulator.toolCalls;
|
||||||
|
|
||||||
// Emit token usage after stream completes (only when usage data was received)
|
// Emit token usage after stream completes (only when usage data was received)
|
||||||
const hasUsageData = promptTokens > 0 || completionTokens > 0;
|
if (callbacks.onTokenUsage && receivedUsage) {
|
||||||
if (callbacks.onTokenUsage && hasUsageData) {
|
|
||||||
const inputTokens = promptTokens - cacheReadTokens;
|
const inputTokens = promptTokens - cacheReadTokens;
|
||||||
const outputTokens = completionTokens;
|
const outputTokens = completionTokens;
|
||||||
|
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ export interface OpenAIStreamAccumulator {
|
|||||||
|
|
||||||
export interface AnthropicStreamAccumulator {
|
export interface AnthropicStreamAccumulator {
|
||||||
toolCalls: Map<number, ToolCallAccumulator>;
|
toolCalls: Map<number, ToolCallAccumulator>;
|
||||||
thinkingBlocks: Map<number, { text: string }>;
|
thinkingBlocks: Map<number, { text: string; signature?: string }>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface HttpStreamError extends Error {
|
export interface HttpStreamError extends Error {
|
||||||
@@ -286,9 +286,18 @@ export function parseAnthropicStreamEvent(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case 'content_block_stop':
|
case 'content_block_stop': {
|
||||||
// Block is complete. Tool arguments can now be parsed by the caller.
|
// Block is complete. Tool arguments can now be parsed by the caller.
|
||||||
|
// For thinking blocks, capture the signature (required by Anthropic when replaying thinking blocks).
|
||||||
|
const stopBlock = (data as any).content_block;
|
||||||
|
if (stopBlock?.type === 'thinking' && stopBlock.signature) {
|
||||||
|
const tb = accumulator.thinkingBlocks.get(data.index as number);
|
||||||
|
if (tb) {
|
||||||
|
tb.signature = stopBlock.signature;
|
||||||
|
}
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case 'message_delta': {
|
case 'message_delta': {
|
||||||
if ((data as any).usage) {
|
if ((data as any).usage) {
|
||||||
@@ -343,7 +352,7 @@ const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]);
|
|||||||
*/
|
*/
|
||||||
export async function withRetry<T>(
|
export async function withRetry<T>(
|
||||||
fn: () => Promise<T>,
|
fn: () => Promise<T>,
|
||||||
options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void } = {},
|
options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void; signal?: AbortSignal } = {},
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
const maxRetries = options.maxRetries ?? 3;
|
const maxRetries = options.maxRetries ?? 3;
|
||||||
let lastError: Error | undefined;
|
let lastError: Error | undefined;
|
||||||
@@ -360,6 +369,13 @@ export async function withRetry<T>(
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check signal before retrying
|
||||||
|
if (options.signal?.aborted) {
|
||||||
|
const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError;
|
||||||
|
abortError.isAbort = true;
|
||||||
|
throw abortError;
|
||||||
|
}
|
||||||
|
|
||||||
// Don't retry on non-retryable status codes
|
// Don't retry on non-retryable status codes
|
||||||
if (httpError.statusCode && !RETRYABLE_STATUS_CODES.has(httpError.statusCode)) {
|
if (httpError.statusCode && !RETRYABLE_STATUS_CODES.has(httpError.statusCode)) {
|
||||||
throw error;
|
throw error;
|
||||||
@@ -384,7 +400,26 @@ export async function withRetry<T>(
|
|||||||
options.onRetry(attempt + 1, lastError);
|
options.onRetry(attempt + 1, lastError);
|
||||||
}
|
}
|
||||||
|
|
||||||
await new Promise(resolve => setTimeout(resolve, delay));
|
// Abort-aware delay: reject immediately if signal fires during wait
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
const timer = setTimeout(resolve, delay);
|
||||||
|
if (options.signal) {
|
||||||
|
const onAbort = () => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError;
|
||||||
|
abortError.isAbort = true;
|
||||||
|
reject(abortError);
|
||||||
|
};
|
||||||
|
if (options.signal.aborted) {
|
||||||
|
clearTimeout(timer);
|
||||||
|
const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError;
|
||||||
|
abortError.isAbort = true;
|
||||||
|
reject(abortError);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
options.signal.addEventListener('abort', onAbort, { once: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1399,3 +1399,200 @@ describe('async iterator return() cleanup', () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── Thinking block signature capture ──
|
||||||
|
|
||||||
|
describe('Anthropic thinking block signature', () => {
|
||||||
|
let accumulator: AnthropicStreamAccumulator;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
accumulator = createAnthropicStreamAccumulator();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('captures signature from content_block_stop for thinking blocks', () => {
|
||||||
|
// Start thinking block
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_start',
|
||||||
|
data: JSON.stringify({
|
||||||
|
type: 'content_block_start',
|
||||||
|
index: 0,
|
||||||
|
content_block: { type: 'thinking', thinking: '' },
|
||||||
|
}),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
// Accumulate thinking
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_delta',
|
||||||
|
data: JSON.stringify({
|
||||||
|
type: 'content_block_delta',
|
||||||
|
index: 0,
|
||||||
|
delta: { type: 'thinking_delta', thinking: 'Let me reason...' },
|
||||||
|
}),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
// content_block_stop with signature
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_stop',
|
||||||
|
data: JSON.stringify({
|
||||||
|
type: 'content_block_stop',
|
||||||
|
index: 0,
|
||||||
|
content_block: {
|
||||||
|
type: 'thinking',
|
||||||
|
thinking: 'Let me reason...',
|
||||||
|
signature: 'ErUBCkYIAxgCIkD+ybfICm10kSig...',
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
const tb = accumulator.thinkingBlocks.get(0);
|
||||||
|
expect(tb).toBeDefined();
|
||||||
|
expect(tb!.text).toBe('Let me reason...');
|
||||||
|
expect(tb!.signature).toBe('ErUBCkYIAxgCIkD+ybfICm10kSig...');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('leaves signature undefined when content_block_stop has no signature', () => {
|
||||||
|
// Start thinking block
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_start',
|
||||||
|
data: JSON.stringify({
|
||||||
|
type: 'content_block_start',
|
||||||
|
index: 0,
|
||||||
|
content_block: { type: 'thinking', thinking: '' },
|
||||||
|
}),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
// content_block_stop without signature
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_stop',
|
||||||
|
data: JSON.stringify({
|
||||||
|
type: 'content_block_stop',
|
||||||
|
index: 0,
|
||||||
|
}),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
const tb = accumulator.thinkingBlocks.get(0);
|
||||||
|
expect(tb).toBeDefined();
|
||||||
|
expect(tb!.signature).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not affect tool_call blocks on content_block_stop', () => {
|
||||||
|
// Start tool_use block
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_start',
|
||||||
|
data: JSON.stringify({
|
||||||
|
type: 'content_block_start',
|
||||||
|
index: 0,
|
||||||
|
content_block: { type: 'tool_use', id: 'toolu_1', name: 'search_posts' },
|
||||||
|
}),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
// Tool argument fragment
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_delta',
|
||||||
|
data: JSON.stringify({
|
||||||
|
type: 'content_block_delta',
|
||||||
|
index: 0,
|
||||||
|
delta: { type: 'input_json_delta', partial_json: '{"query":"test"}' },
|
||||||
|
}),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
// content_block_stop (no signature for tool blocks)
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_stop',
|
||||||
|
data: JSON.stringify({
|
||||||
|
type: 'content_block_stop',
|
||||||
|
index: 0,
|
||||||
|
}),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
// Tool call should be unaffected
|
||||||
|
const tc = accumulator.toolCalls.get(0);
|
||||||
|
expect(tc).toBeDefined();
|
||||||
|
expect(tc!.arguments).toBe('{"query":"test"}');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('full thinking sequence produces signature on accumulator', () => {
|
||||||
|
// Full realistic sequence: thinking block -> text block -> tool_use
|
||||||
|
// Thinking at index 0
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_start',
|
||||||
|
data: JSON.stringify({ type: 'content_block_start', index: 0, content_block: { type: 'thinking', thinking: '' } }),
|
||||||
|
}, accumulator);
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_delta',
|
||||||
|
data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'Step 1. ' } }),
|
||||||
|
}, accumulator);
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_delta',
|
||||||
|
data: JSON.stringify({ type: 'content_block_delta', index: 0, delta: { type: 'thinking_delta', thinking: 'Step 2.' } }),
|
||||||
|
}, accumulator);
|
||||||
|
parseAnthropicStreamEvent({
|
||||||
|
event: 'content_block_stop',
|
||||||
|
data: JSON.stringify({ type: 'content_block_stop', index: 0, content_block: { type: 'thinking', thinking: 'Step 1. Step 2.', signature: 'sig_abc123' } }),
|
||||||
|
}, accumulator);
|
||||||
|
|
||||||
|
expect(accumulator.thinkingBlocks.get(0)).toEqual({ text: 'Step 1. Step 2.', signature: 'sig_abc123' });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── withRetry abort-aware delay ──
|
||||||
|
|
||||||
|
describe('withRetry abort during delay', () => {
|
||||||
|
it('rejects quickly when signal is aborted during retry delay', async () => {
|
||||||
|
const controller = new AbortController();
|
||||||
|
const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 });
|
||||||
|
const fn = vi.fn().mockRejectedValue(error429);
|
||||||
|
|
||||||
|
const promise = withRetry(fn, {
|
||||||
|
maxRetries: 3,
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
// First attempt fails immediately, then enters retry delay.
|
||||||
|
// Wait a small amount for the first attempt to fail and delay to start.
|
||||||
|
await new Promise(r => setTimeout(r, 50));
|
||||||
|
expect(fn).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// Abort during the delay
|
||||||
|
controller.abort();
|
||||||
|
|
||||||
|
// Should reject with abort error, not wait for delay to finish
|
||||||
|
await expect(promise).rejects.toThrow();
|
||||||
|
// Should NOT have made a second attempt — aborted during delay
|
||||||
|
expect(fn).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not abort delay when no signal is provided', async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 });
|
||||||
|
const fn = vi.fn()
|
||||||
|
.mockRejectedValueOnce(error429)
|
||||||
|
.mockResolvedValue('ok');
|
||||||
|
|
||||||
|
const promise = withRetry(fn, { maxRetries: 3 });
|
||||||
|
await vi.advanceTimersByTimeAsync(2000);
|
||||||
|
const result = await promise;
|
||||||
|
expect(result).toBe('ok');
|
||||||
|
expect(fn).toHaveBeenCalledTimes(2);
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('works normally when signal is not aborted', async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
const controller = new AbortController();
|
||||||
|
const error429 = Object.assign(new Error('Rate limited'), { statusCode: 429 });
|
||||||
|
const fn = vi.fn()
|
||||||
|
.mockRejectedValueOnce(error429)
|
||||||
|
.mockResolvedValue('ok');
|
||||||
|
|
||||||
|
const promise = withRetry(fn, {
|
||||||
|
maxRetries: 3,
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
await vi.advanceTimersByTimeAsync(2000);
|
||||||
|
const result = await promise;
|
||||||
|
expect(result).toBe('ok');
|
||||||
|
expect(fn).toHaveBeenCalledTimes(2);
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user