diff --git a/src/main/engine/OpenCodeManager.ts b/src/main/engine/OpenCodeManager.ts index fd3eac8..749deaf 100644 --- a/src/main/engine/OpenCodeManager.ts +++ b/src/main/engine/OpenCodeManager.ts @@ -13,14 +13,12 @@ import http from 'http'; import { URL } from 'url'; import { BrowserWindow } from 'electron'; import { - parseSSELines, parseAnthropicStreamEvent, parseOpenAIStreamEvent, createAnthropicStreamAccumulator, createOpenAIStreamAccumulator, httpRequestStream, withRetry, - type HttpStreamError, } from './streaming'; import { ChatEngine } from './ChatEngine'; import { PostEngine, type PostData } from './PostEngine'; @@ -485,17 +483,10 @@ export class OpenCodeManager { cache_control: { type: 'ephemeral' }, }; - // Stream the response with retry for transient errors (including mid-stream failures) - const streamResult = await withRetry(async () => { - const streamAccumulator = createAnthropicStreamAccumulator(); - let stopReason = ''; - let inputTokens = 0; - let outputTokens = 0; - let cacheReadTokens = 0; - let cacheWriteTokens = 0; - let roundText = ''; - - const { events } = await httpRequestStream(ZEN_ANTHROPIC_URL, { + // Retry only the HTTP connection (429/502/503 are caught before any events are emitted). + // Event processing is outside retry scope to prevent double-emission of onDelta on retry. + const { events } = await withRetry(async () => { + return httpRequestStream(ZEN_ANTHROPIC_URL, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -506,35 +497,42 @@ export class OpenCodeManager { body: JSON.stringify(body), signal, }); - - for await (const event of events) { - const result = parseAnthropicStreamEvent(event, streamAccumulator); - - if (result.textDelta) { - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); - } - } - - if (result.usage) { - if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; - if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; - } - - if (result.finishReason) { - stopReason = result.finishReason; - } - - if (result.done) break; - } - - return { roundText, stopReason, toolCalls: streamAccumulator.toolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens }; }); - const { roundText, stopReason, toolCalls: streamToolCalls, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens } = streamResult; + // Process stream events outside retry scope — onDelta is never called twice for the same text + const streamAccumulator = createAnthropicStreamAccumulator(); + let stopReason = ''; + let inputTokens = 0; + let outputTokens = 0; + let cacheReadTokens = 0; + let cacheWriteTokens = 0; + let roundText = ''; + + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, streamAccumulator); + + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } + } + + if (result.usage) { + if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens; + if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens; + } + + if (result.finishReason) { + stopReason = result.finishReason; + } + + if (result.done) break; + } + + const streamToolCalls = streamAccumulator.toolCalls; accumulatedText += roundText; // Emit token usage after stream completes @@ -681,6 +679,8 @@ export class OpenCodeManager { } } + if (signal.aborted) break; + // Add assistant response and tool results to messages for next round messages = [ ...messages, @@ -764,17 +764,10 @@ export class OpenCodeManager { stream_options: { include_usage: true }, }; - // Stream the response with retry for transient errors (including mid-stream failures) - const streamResult = await withRetry(async () => { - const streamAccumulator = createOpenAIStreamAccumulator(); - let finishReason = ''; - let promptTokens = 0; - let completionTokens = 0; - let totalTokens = 0; - let cacheReadTokens = 0; - let roundText = ''; - - const { events } = await httpRequestStream(ZEN_OPENAI_URL, { + // Retry only the HTTP connection (429/502/503 are caught before any events are emitted). + // Event processing is outside retry scope to prevent double-emission of onDelta on retry. + const { events } = await withRetry(async () => { + return httpRequestStream(ZEN_OPENAI_URL, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -783,35 +776,42 @@ export class OpenCodeManager { body: JSON.stringify(body), signal, }); - - for await (const event of events) { - const result = parseOpenAIStreamEvent(event, streamAccumulator); - - if (result.textDelta) { - roundText += result.textDelta; - if (callbacks.onDelta) { - callbacks.onDelta(result.textDelta); - } - } - - if (result.usage) { - if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; - if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; - if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; - if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; - } - - if (result.finishReason) { - finishReason = result.finishReason; - } - - if (result.done) break; - } - - return { roundText, finishReason, toolCalls: streamAccumulator.toolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens }; }); - const { roundText, finishReason, toolCalls: streamToolCalls, promptTokens, completionTokens, totalTokens, cacheReadTokens } = streamResult; + // Process stream events outside retry scope — onDelta is never called twice for the same text + const streamAccumulator = createOpenAIStreamAccumulator(); + let finishReason = ''; + let promptTokens = 0; + let completionTokens = 0; + let totalTokens = 0; + let cacheReadTokens = 0; + let roundText = ''; + + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, streamAccumulator); + + if (result.textDelta) { + roundText += result.textDelta; + if (callbacks.onDelta) { + callbacks.onDelta(result.textDelta); + } + } + + if (result.usage) { + if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens; + if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens; + if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens; + if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens; + } + + if (result.finishReason) { + finishReason = result.finishReason; + } + + if (result.done) break; + } + + const streamToolCalls = streamAccumulator.toolCalls; accumulatedText += roundText; // Emit token usage after stream completes @@ -915,6 +915,8 @@ export class OpenCodeManager { tool_call_id: toolCall.id, }); } + + if (signal.aborted) break; } // Hit max rounds diff --git a/src/main/engine/streaming.ts b/src/main/engine/streaming.ts index 1cc6017..9cb50a9 100644 --- a/src/main/engine/streaming.ts +++ b/src/main/engine/streaming.ts @@ -323,8 +323,15 @@ const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]); * Retry a function with exponential backoff for transient HTTP errors. * * Retries on 429 (rate limit), 502 (bad gateway), 503 (service unavailable). - * Does NOT retry on other 4xx errors or abort. + * Also retries errors without a statusCode (e.g. ECONNRESET, EPIPE) since + * these indicate transient network failures during connection. + * + * Does NOT retry on other 4xx errors (400, 401, 403 — client errors) or abort. * Respects Retry-After header for 429 responses. + * + * Best practice: wrap only the HTTP connection (httpRequestStream) in withRetry, + * NOT the event processing loop. This ensures onDelta callbacks are never + * called twice for the same text on retry. */ export async function withRetry( fn: () => Promise, diff --git a/tests/engine/streaming.test.ts b/tests/engine/streaming.test.ts index cd8ba00..3adcb6d 100644 --- a/tests/engine/streaming.test.ts +++ b/tests/engine/streaming.test.ts @@ -1071,6 +1071,122 @@ describe('mid-stream retry with withRetry', () => { }); }); +// ── Connection-only retry (no double-emission) ── + +describe('connection-only retry pattern (withRetry wrapping httpRequestStream)', () => { + function startTestServer(handler: (req: http.IncomingMessage, res: http.ServerResponse) => void): Promise<{ url: string; close: () => Promise }> { + return new Promise((resolve) => { + const server = http.createServer(handler); + server.listen(0, () => { + const addr = server.address() as { port: number }; + resolve({ + url: `http://localhost:${addr.port}`, + close: () => new Promise((r) => server.close(() => r())), + }); + }); + }); + } + + it('retries 429 at connection time without emitting duplicate deltas', async () => { + let requestCount = 0; + const srv = await startTestServer((_req, res) => { + requestCount++; + if (requestCount === 1) { + res.writeHead(429, { 'Content-Type': 'application/json', 'Retry-After': '0' }); + res.end(JSON.stringify({ error: { message: 'Rate limited' } })); + return; + } + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n'); + res.write('data: {"choices":[{"delta":{"content":" world"}}]}\n\n'); + res.write('data: [DONE]\n\n'); + res.end(); + }); + + try { + const deltas: string[] = []; + + // Retry only the connection, process events outside retry + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createOpenAIStreamAccumulator(); + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + + // Each delta appears exactly once — no double-emission + expect(deltas).toEqual(['Hello', ' world']); + expect(requestCount).toBe(2); // 1 failed + 1 success + } finally { + await srv.close(); + } + }); + + it('mid-stream TCP error propagates without retry when only connection is wrapped', async () => { + const srv = await startTestServer((_req, res) => { + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('data: {"choices":[{"delta":{"content":"Hi"}}]}\n\n'); + // Destroy socket to simulate mid-stream TCP disconnect + setTimeout(() => res.destroy(), 20); + }); + + try { + const deltas: string[] = []; + + // Only connection is retried — mid-stream errors propagate + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createOpenAIStreamAccumulator(); + await expect(async () => { + for await (const event of events) { + const result = parseOpenAIStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + }).rejects.toThrow(); + + // Partial delta was received before the error — no duplication + expect(deltas).toEqual(['Hi']); + } finally { + await srv.close(); + } + }); + + it('retries 502 at connection time then streams successfully', async () => { + let requestCount = 0; + const srv = await startTestServer((_req, res) => { + requestCount++; + if (requestCount === 1) { + res.writeHead(502); + res.end('Bad Gateway'); + return; + } + res.writeHead(200, { 'Content-Type': 'text/event-stream' }); + res.write('event: message_start\ndata: {"type":"message_start","message":{"id":"msg_1","usage":{"input_tokens":10}}}\n\n'); + res.write('event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"OK"}}\n\n'); + res.write('event: message_stop\ndata: {"type":"message_stop"}\n\n'); + res.end(); + }); + + try { + const deltas: string[] = []; + + const { events } = await withRetry(() => httpRequestStream(srv.url, { method: 'POST', body: '{}' })); + + const acc = createAnthropicStreamAccumulator(); + for await (const event of events) { + const result = parseAnthropicStreamEvent(event, acc); + if (result.textDelta) deltas.push(result.textDelta); + } + + expect(deltas).toEqual(['OK']); + expect(requestCount).toBe(2); + } finally { + await srv.close(); + } + }); +}); + // ── SSE spec compliance ── describe('SSE spec compliance - single space removal', () => {