fix: SSE streaming review fixes round 2

This commit is contained in:
2026-03-01 10:54:02 +01:00
parent 5267ff77df
commit 2dec5592c9
4 changed files with 380 additions and 106 deletions

View File

@@ -473,6 +473,7 @@ export class OpenCodeManager {
while (round < MAX_TOOL_ROUNDS) {
round++;
if (signal.aborted) break;
const body: Record<string, unknown> = {
model: modelId,
@@ -484,51 +485,57 @@ export class OpenCodeManager {
cache_control: { type: 'ephemeral' },
};
// Stream the response with retry for transient errors
const streamAccumulator = createAnthropicStreamAccumulator();
let stopReason = '';
let inputTokens = 0;
let outputTokens = 0;
let cacheReadTokens = 0;
let cacheWriteTokens = 0;
let roundText = ''; // Text produced in this round only
// Stream the response with retry for transient errors (including mid-stream failures)
const streamResult = await withRetry(async () => {
const streamAccumulator = createAnthropicStreamAccumulator();
let stopReason = '';
let inputTokens = 0;
let outputTokens = 0;
let cacheReadTokens = 0;
let cacheWriteTokens = 0;
let roundText = '';
const { events } = await withRetry(() => httpRequestStream(ZEN_ANTHROPIC_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': this.apiKey,
'Authorization': `Bearer ${this.apiKey}`,
'anthropic-version': '2023-06-01',
},
body: JSON.stringify(body),
signal,
}));
const { events } = await httpRequestStream(ZEN_ANTHROPIC_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': this.apiKey,
'Authorization': `Bearer ${this.apiKey}`,
'anthropic-version': '2023-06-01',
},
body: JSON.stringify(body),
signal,
});
for await (const event of events) {
const result = parseAnthropicStreamEvent(event, streamAccumulator);
for await (const event of events) {
const result = parseAnthropicStreamEvent(event, streamAccumulator);
// Emit text deltas immediately for real-time streaming
if (result.textDelta) {
accumulatedText += result.textDelta;
roundText += result.textDelta;
if (callbacks.onDelta) {
callbacks.onDelta(result.textDelta);
if (result.textDelta) {
roundText += result.textDelta;
if (callbacks.onDelta) {
callbacks.onDelta(result.textDelta);
}
}
if (result.usage) {
if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens;
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens;
if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens;
}
if (result.finishReason) {
stopReason = result.finishReason;
}
if (result.done) break;
}
// Collect usage from message_start (input tokens) and message_delta (output tokens)
if (result.usage) {
if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens;
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens;
if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens;
}
return { roundText, stopReason, toolCalls: streamAccumulator.toolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens };
});
if (result.finishReason) {
stopReason = result.finishReason;
}
}
const { roundText, stopReason, toolCalls: streamToolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens } = streamResult;
accumulatedText += roundText;
// Emit token usage after stream completes
if (callbacks.onTokenUsage) {
@@ -558,7 +565,7 @@ export class OpenCodeManager {
// Collect tool calls from stream accumulator
const toolUseBlocks: Array<{ id: string; name: string; input: unknown }> = [];
for (const [, tc] of streamAccumulator.toolCalls) {
for (const [, tc] of streamToolCalls) {
try {
toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) });
} catch {
@@ -626,7 +633,8 @@ export class OpenCodeManager {
continue;
}
// Execute the tool
// Execute the tool (check abort before each tool execution)
if (signal.aborted) break;
const result = await this.executeTool(toolName, toolArgs as Record<string, unknown>);
if (callbacks.onToolResult) {
@@ -745,6 +753,7 @@ export class OpenCodeManager {
while (round < MAX_TOOL_ROUNDS) {
round++;
if (signal.aborted) break;
const body: Record<string, unknown> = {
model: modelId,
@@ -755,51 +764,55 @@ export class OpenCodeManager {
stream_options: { include_usage: true },
};
// Stream the response with retry for transient errors
const streamAccumulator = createOpenAIStreamAccumulator();
let finishReason = '';
let promptTokens = 0;
let completionTokens = 0;
let totalTokens = 0;
let cacheReadTokens = 0;
let roundText = ''; // Text produced in this round only
// Stream the response with retry for transient errors (including mid-stream failures)
const streamResult = await withRetry(async () => {
const streamAccumulator = createOpenAIStreamAccumulator();
let finishReason = '';
let promptTokens = 0;
let completionTokens = 0;
let totalTokens = 0;
let cacheReadTokens = 0;
let roundText = '';
const { events } = await withRetry(() => httpRequestStream(ZEN_OPENAI_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`,
},
body: JSON.stringify(body),
signal,
}));
const { events } = await httpRequestStream(ZEN_OPENAI_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`,
},
body: JSON.stringify(body),
signal,
});
for await (const event of events) {
const result = parseOpenAIStreamEvent(event, streamAccumulator);
for await (const event of events) {
const result = parseOpenAIStreamEvent(event, streamAccumulator);
// Emit text deltas immediately for real-time streaming
if (result.textDelta) {
accumulatedText += result.textDelta;
roundText += result.textDelta;
if (callbacks.onDelta) {
callbacks.onDelta(result.textDelta);
if (result.textDelta) {
roundText += result.textDelta;
if (callbacks.onDelta) {
callbacks.onDelta(result.textDelta);
}
}
if (result.usage) {
if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens;
if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens;
if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens;
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
}
if (result.finishReason) {
finishReason = result.finishReason;
}
if (result.done) break;
}
// Collect usage from final chunk
if (result.usage) {
if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens;
if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens;
if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens;
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
}
return { roundText, finishReason, toolCalls: streamAccumulator.toolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens };
});
if (result.finishReason) {
finishReason = result.finishReason;
}
if (result.done) break;
}
const { roundText, finishReason, toolCalls: streamToolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens } = streamResult;
accumulatedText += roundText;
// Emit token usage after stream completes
if (callbacks.onTokenUsage) {
@@ -818,7 +831,8 @@ export class OpenCodeManager {
this.conversationUsage.set(conversationId, cumulative);
callbacks.onTokenUsage({
inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens: 0,
inputTokens, outputTokens, cacheReadTokens,
cacheWriteTokens: 0, // OpenAI streaming does not report cache write tokens
totalTokens: totalTokens || inputTokens + outputTokens,
cumulativeInputTokens: cumulative.inputTokens,
cumulativeOutputTokens: cumulative.outputTokens,
@@ -830,7 +844,7 @@ export class OpenCodeManager {
// Collect tool calls from stream accumulator
const parsedToolCalls: Array<{ id: string; name: string; args: unknown }> = [];
for (const [, tc] of streamAccumulator.toolCalls) {
for (const [, tc] of streamToolCalls) {
try {
parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) });
} catch {
@@ -887,6 +901,8 @@ export class OpenCodeManager {
continue;
}
// Check abort before each tool execution
if (signal.aborted) break;
const result = await this.executeTool(toolName, toolArgs as Record<string, unknown>);
if (callbacks.onToolResult) {

View File

@@ -93,9 +93,11 @@ export function parseSSELines(text: string): { events: SSEEvent[]; remaining: st
if (line.startsWith(':')) continue;
if (line.startsWith('event: ') || line.startsWith('event:')) {
eventType = line.slice(line.indexOf(':') + 1).trim();
const afterColon = line.slice(line.indexOf(':') + 1);
eventType = afterColon.startsWith(' ') ? afterColon.slice(1) : afterColon;
} else if (line.startsWith('data: ') || line.startsWith('data:')) {
dataLines.push(line.slice(line.indexOf(':') + 1).trimStart());
const afterColon = line.slice(line.indexOf(':') + 1);
dataLines.push(afterColon.startsWith(' ') ? afterColon.slice(1) : afterColon);
}
}
@@ -326,7 +328,7 @@ const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]);
*/
export async function withRetry<T>(
fn: () => Promise<T>,
options: { maxRetries?: number } = {},
options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void } = {},
): Promise<T> {
const maxRetries = options.maxRetries ?? 3;
let lastError: Error | undefined;
@@ -363,6 +365,10 @@ export async function withRetry<T>(
delay = Math.max(delay, httpError.retryAfter * 1000);
}
if (options.onRetry) {
options.onRetry(attempt + 1, lastError);
}
await new Promise(resolve => setTimeout(resolve, delay));
}
}
@@ -446,6 +452,7 @@ export function httpRequestStream(
[Symbol.asyncIterator]() {
let buffer = '';
let done = false;
let pendingError: Error | null = null;
const eventQueue: SSEEvent[] = [];
let resolveNext: ((value: IteratorResult<SSEEvent>) => void) | null = null;
let rejectNext: ((error: Error) => void) | null = null;
@@ -484,6 +491,9 @@ export function httpRequestStream(
resolveNext = null;
rejectNext = null;
reject(err);
} else {
// Store error for next .next() call so it's not silently swallowed
pendingError = err;
}
});
@@ -494,6 +504,13 @@ export function httpRequestStream(
return Promise.resolve({ value: eventQueue.shift()!, done: false });
}
// Throw stored error from a previous event that fired with no consumer waiting
if (pendingError) {
const err = pendingError;
pendingError = null;
return Promise.reject(err);
}
// Stream already ended
if (done) {
return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true });