fix: scope retry to connection only, prevent onDelta double-emission

This commit is contained in:
2026-03-01 11:42:07 +01:00
parent 2dec5592c9
commit bae229a9a2
3 changed files with 204 additions and 79 deletions

View File

@@ -13,14 +13,12 @@ import http from 'http';
import { URL } from 'url';
import { BrowserWindow } from 'electron';
import {
parseSSELines,
parseAnthropicStreamEvent,
parseOpenAIStreamEvent,
createAnthropicStreamAccumulator,
createOpenAIStreamAccumulator,
httpRequestStream,
withRetry,
type HttpStreamError,
} from './streaming';
import { ChatEngine } from './ChatEngine';
import { PostEngine, type PostData } from './PostEngine';
@@ -485,17 +483,10 @@ export class OpenCodeManager {
cache_control: { type: 'ephemeral' },
};
// Stream the response with retry for transient errors (including mid-stream failures)
const streamResult = await withRetry(async () => {
const streamAccumulator = createAnthropicStreamAccumulator();
let stopReason = '';
let inputTokens = 0;
let outputTokens = 0;
let cacheReadTokens = 0;
let cacheWriteTokens = 0;
let roundText = '';
const { events } = await httpRequestStream(ZEN_ANTHROPIC_URL, {
// Retry only the HTTP connection (429/502/503 are caught before any events are emitted).
// Event processing is outside retry scope to prevent double-emission of onDelta on retry.
const { events } = await withRetry(async () => {
return httpRequestStream(ZEN_ANTHROPIC_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@@ -506,6 +497,16 @@ export class OpenCodeManager {
body: JSON.stringify(body),
signal,
});
});
// Process stream events outside retry scope — onDelta is never called twice for the same text
const streamAccumulator = createAnthropicStreamAccumulator();
let stopReason = '';
let inputTokens = 0;
let outputTokens = 0;
let cacheReadTokens = 0;
let cacheWriteTokens = 0;
let roundText = '';
for await (const event of events) {
const result = parseAnthropicStreamEvent(event, streamAccumulator);
@@ -531,10 +532,7 @@ export class OpenCodeManager {
if (result.done) break;
}
return { roundText, stopReason, toolCalls: streamAccumulator.toolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens };
});
const { roundText, stopReason, toolCalls: streamToolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens } = streamResult;
const streamToolCalls = streamAccumulator.toolCalls;
accumulatedText += roundText;
// Emit token usage after stream completes
@@ -681,6 +679,8 @@ export class OpenCodeManager {
}
}
if (signal.aborted) break;
// Add assistant response and tool results to messages for next round
messages = [
...messages,
@@ -764,17 +764,10 @@ export class OpenCodeManager {
stream_options: { include_usage: true },
};
// Stream the response with retry for transient errors (including mid-stream failures)
const streamResult = await withRetry(async () => {
const streamAccumulator = createOpenAIStreamAccumulator();
let finishReason = '';
let promptTokens = 0;
let completionTokens = 0;
let totalTokens = 0;
let cacheReadTokens = 0;
let roundText = '';
const { events } = await httpRequestStream(ZEN_OPENAI_URL, {
// Retry only the HTTP connection (429/502/503 are caught before any events are emitted).
// Event processing is outside retry scope to prevent double-emission of onDelta on retry.
const { events } = await withRetry(async () => {
return httpRequestStream(ZEN_OPENAI_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@@ -783,6 +776,16 @@ export class OpenCodeManager {
body: JSON.stringify(body),
signal,
});
});
// Process stream events outside retry scope — onDelta is never called twice for the same text
const streamAccumulator = createOpenAIStreamAccumulator();
let finishReason = '';
let promptTokens = 0;
let completionTokens = 0;
let totalTokens = 0;
let cacheReadTokens = 0;
let roundText = '';
for await (const event of events) {
const result = parseOpenAIStreamEvent(event, streamAccumulator);
@@ -808,10 +811,7 @@ export class OpenCodeManager {
if (result.done) break;
}
return { roundText, finishReason, toolCalls: streamAccumulator.toolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens };
});
const { roundText, finishReason, toolCalls: streamToolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens } = streamResult;
const streamToolCalls = streamAccumulator.toolCalls;
accumulatedText += roundText;
// Emit token usage after stream completes
@@ -915,6 +915,8 @@ export class OpenCodeManager {
tool_call_id: toolCall.id,
});
}
if (signal.aborted) break;
}
// Hit max rounds

View File

@@ -323,8 +323,15 @@ const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]);
* Retry a function with exponential backoff for transient HTTP errors.
*
* Retries on 429 (rate limit), 502 (bad gateway), 503 (service unavailable).
* Does NOT retry on other 4xx errors or abort.
* Also retries errors without a statusCode (e.g. ECONNRESET, EPIPE) since
* these indicate transient network failures during connection.
*
* Does NOT retry on other 4xx errors (400, 401, 403 — client errors) or abort.
* Respects Retry-After header for 429 responses.
*
* Best practice: wrap only the HTTP connection (httpRequestStream) in withRetry,
* NOT the event processing loop. This ensures onDelta callbacks are never
* called twice for the same text on retry.
*/
export async function withRetry<T>(
fn: () => Promise<T>,

View File

@@ -1071,6 +1071,122 @@ describe('mid-stream retry with withRetry', () => {
});
});
// ── Connection-only retry (no double-emission) ──
describe('connection-only retry pattern (withRetry wrapping httpRequestStream)', () => {
function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise<void> }> {
return new Promise((resolve) => {
const server = http.createServer(handler);
server.listen(0, () => {
const addr = server.address() as { port: number };
resolve({
url: `http://localhost:${addr.port}`,
close: () => new Promise<void>((r) => server.close(() => r())),
});
});
});
}
it('retries 429 at connection time without emitting duplicate deltas', async () => {
let requestCount = 0;
const srv = await startTestServer((_req, res) => {
requestCount++;
if (requestCount === 1) {
res.writeHead(429, { 'Content-Type': 'application/json', 'Retry-After': '0' });
res.end(JSON.stringify({ error: { message: 'Rate limited' } }));
return;
}
res.writeHead(200, { 'Content-Type': 'text/event-stream' });
res.write('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n');
res.write('data: {"choices":[{"delta":{"content":" world"}}]}\n\n');
res.write('data: [DONE]\n\n');
res.end();
});
try {
const deltas: string[] = [];
// Retry only the connection, process events outside retry
const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' }));
const acc = createOpenAIStreamAccumulator();
for await (const event of events) {
const result = parseOpenAIStreamEvent(event, acc);
if (result.textDelta) deltas.push(result.textDelta);
}
// Each delta appears exactly once — no double-emission
expect(deltas).toEqual(['Hello', ' world']);
expect(requestCount).toBe(2); // 1 failed + 1 success
} finally {
await srv.close();
}
});
it('mid-stream TCP error propagates without retry when only connection is wrapped', async () => {
const srv = await startTestServer((_req, res) => {
res.writeHead(200, { 'Content-Type': 'text/event-stream' });
res.write('data: {"choices":[{"delta":{"content":"Hi"}}]}\n\n');
// Destroy socket to simulate mid-stream TCP disconnect
setTimeout(() => res.destroy(), 20);
});
try {
const deltas: string[] = [];
// Only connection is retried — mid-stream errors propagate
const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' }));
const acc = createOpenAIStreamAccumulator();
await expect(async () => {
for await (const event of events) {
const result = parseOpenAIStreamEvent(event, acc);
if (result.textDelta) deltas.push(result.textDelta);
}
}).rejects.toThrow();
// Partial delta was received before the error — no duplication
expect(deltas).toEqual(['Hi']);
} finally {
await srv.close();
}
});
it('retries 502 at connection time then streams successfully', async () => {
let requestCount = 0;
const srv = await startTestServer((_req, res) => {
requestCount++;
if (requestCount === 1) {
res.writeHead(502);
res.end('Bad Gateway');
return;
}
res.writeHead(200, { 'Content-Type': 'text/event-stream' });
res.write('event: message_start\ndata: {"type":"message_start","message":{"id":"msg_1","usage":{"input_tokens":10}}}\n\n');
res.write('event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"OK"}}\n\n');
res.write('event: message_stop\ndata: {"type":"message_stop"}\n\n');
res.end();
});
try {
const deltas: string[] = [];
const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' }));
const acc = createAnthropicStreamAccumulator();
for await (const event of events) {
const result = parseAnthropicStreamEvent(event, acc);
if (result.textDelta) deltas.push(result.textDelta);
}
expect(deltas).toEqual(['OK']);
expect(requestCount).toBe(2);
} finally {
await srv.close();
}
});
});
// ── SSE spec compliance ──
describe('SSE spec compliance - single space removal', () => {