Merge pull request #26 from rfc1437/feature/sse-streaming
feat: SSE streaming for chat providers
This commit is contained in:
8
.vscode/settings.json
vendored
8
.vscode/settings.json
vendored
@@ -4,6 +4,10 @@
|
||||
"npx tsc": true,
|
||||
"git remote": true,
|
||||
"npx asar": true,
|
||||
"npx tsx": true
|
||||
"npx tsx": true,
|
||||
"gh": true,
|
||||
"git add": true,
|
||||
"git commit": true,
|
||||
"git push": true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,11 @@ This document provides context and best practices for GitHub Copilot when workin
|
||||
- Make the plan extremely concise. Sacrifice grammar for the sake of concision.
|
||||
- At the end of each plan, give me a list of unresolved questions to answer, if any.
|
||||
|
||||
## Commits
|
||||
|
||||
- commit messages are short - one sentence. do not write long articles.
|
||||
- pull requests are more verbose and especially give reasoning for changes
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ MANDATORY: Test-First Development
|
||||
|
||||
@@ -12,6 +12,14 @@ import https from 'https';
|
||||
import http from 'http';
|
||||
import { URL } from 'url';
|
||||
import { BrowserWindow } from 'electron';
|
||||
import {
|
||||
parseAnthropicStreamEvent,
|
||||
parseOpenAIStreamEvent,
|
||||
createAnthropicStreamAccumulator,
|
||||
createOpenAIStreamAccumulator,
|
||||
httpRequestStream,
|
||||
withRetry,
|
||||
} from './streaming';
|
||||
import { ChatEngine } from './ChatEngine';
|
||||
import { PostEngine, type PostData } from './PostEngine';
|
||||
import { MediaEngine, type MediaData } from './MediaEngine';
|
||||
@@ -129,6 +137,8 @@ interface AnthropicContentBlock {
|
||||
input?: unknown;
|
||||
tool_use_id?: string;
|
||||
content?: string | AnthropicToolResultContent[];
|
||||
is_error?: boolean;
|
||||
signature?: string;
|
||||
source?: {
|
||||
type: 'base64';
|
||||
media_type: string;
|
||||
@@ -463,6 +473,7 @@ export class OpenCodeManager {
|
||||
|
||||
while (round < MAX_TOOL_ROUNDS) {
|
||||
round++;
|
||||
if (signal.aborted) break;
|
||||
|
||||
const body: Record<string, unknown> = {
|
||||
model: modelId,
|
||||
@@ -470,42 +481,79 @@ export class OpenCodeManager {
|
||||
system: systemPrompt,
|
||||
messages,
|
||||
tools,
|
||||
stream: true,
|
||||
cache_control: { type: 'ephemeral' },
|
||||
};
|
||||
|
||||
const response = await this.httpRequest(ZEN_ANTHROPIC_URL, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'x-api-key': this.apiKey,
|
||||
'Authorization': `Bearer ${this.apiKey}`,
|
||||
'anthropic-version': '2023-06-01',
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
signal,
|
||||
// Retry only the HTTP connection (429/502/503 are caught before any events are emitted).
|
||||
// Event processing is outside retry scope to prevent double-emission of onDelta on retry.
|
||||
const { events } = await withRetry(async () => {
|
||||
return httpRequestStream(ZEN_ANTHROPIC_URL, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'x-api-key': this.apiKey,
|
||||
'Authorization': `Bearer ${this.apiKey}`,
|
||||
'anthropic-version': '2023-06-01',
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
signal,
|
||||
});
|
||||
});
|
||||
|
||||
if (response.statusCode >= 400) {
|
||||
const errorMsg = this.parseErrorResponse(response);
|
||||
throw new Error(errorMsg);
|
||||
// Process stream events outside retry scope — onDelta is never called twice for the same text
|
||||
const streamAccumulator = createAnthropicStreamAccumulator();
|
||||
let stopReason = '';
|
||||
let inputTokens = 0;
|
||||
let outputTokens = 0;
|
||||
let cacheReadTokens = 0;
|
||||
let cacheWriteTokens = 0;
|
||||
let roundText = '';
|
||||
let receivedUsage = false;
|
||||
|
||||
try {
|
||||
for await (const event of events) {
|
||||
const result = parseAnthropicStreamEvent(event, streamAccumulator);
|
||||
|
||||
if (result.textDelta) {
|
||||
roundText += result.textDelta;
|
||||
if (callbacks.onDelta) {
|
||||
callbacks.onDelta(result.textDelta);
|
||||
}
|
||||
}
|
||||
|
||||
if (result.usage) {
|
||||
receivedUsage = true;
|
||||
if (result.usage.inputTokens !== undefined) inputTokens = result.usage.inputTokens;
|
||||
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
|
||||
if (result.usage.cacheWriteTokens !== undefined) cacheWriteTokens = result.usage.cacheWriteTokens;
|
||||
if (result.usage.outputTokens !== undefined) outputTokens = result.usage.outputTokens;
|
||||
}
|
||||
|
||||
if (result.finishReason) {
|
||||
stopReason = result.finishReason;
|
||||
}
|
||||
|
||||
if (result.done) break;
|
||||
}
|
||||
} finally {
|
||||
// Preserve text already emitted via onDelta even if the stream errors mid-round
|
||||
accumulatedText += roundText;
|
||||
}
|
||||
|
||||
const data = JSON.parse(response.body);
|
||||
const streamToolCalls = streamAccumulator.toolCalls;
|
||||
const streamThinkingBlocks = streamAccumulator.thinkingBlocks;
|
||||
|
||||
// Extract and emit token usage
|
||||
if (data.usage && callbacks.onTokenUsage) {
|
||||
const usage = data.usage;
|
||||
const cacheReadTokens = usage.cache_read_input_tokens || 0;
|
||||
const cacheWriteTokens = usage.cache_creation_input_tokens || 0;
|
||||
const inputTokens = (usage.input_tokens || 0) - cacheReadTokens - cacheWriteTokens;
|
||||
const outputTokens = usage.output_tokens || 0;
|
||||
const totalTokens = (usage.input_tokens || 0) + outputTokens;
|
||||
// Emit token usage after stream completes (only when usage data was received)
|
||||
if (callbacks.onTokenUsage && receivedUsage) {
|
||||
const adjustedInputTokens = inputTokens - cacheReadTokens - cacheWriteTokens;
|
||||
const totalTokens = inputTokens + outputTokens;
|
||||
|
||||
const prev = this.conversationUsage.get(conversationId) || {
|
||||
inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0,
|
||||
};
|
||||
const cumulative = {
|
||||
inputTokens: prev.inputTokens + inputTokens,
|
||||
inputTokens: prev.inputTokens + adjustedInputTokens,
|
||||
outputTokens: prev.outputTokens + outputTokens,
|
||||
cacheReadTokens: prev.cacheReadTokens + cacheReadTokens,
|
||||
cacheWriteTokens: prev.cacheWriteTokens + cacheWriteTokens,
|
||||
@@ -513,7 +561,7 @@ export class OpenCodeManager {
|
||||
this.conversationUsage.set(conversationId, cumulative);
|
||||
|
||||
callbacks.onTokenUsage({
|
||||
inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens, totalTokens,
|
||||
inputTokens: adjustedInputTokens, outputTokens, cacheReadTokens, cacheWriteTokens, totalTokens,
|
||||
cumulativeInputTokens: cumulative.inputTokens,
|
||||
cumulativeOutputTokens: cumulative.outputTokens,
|
||||
cumulativeCacheReadTokens: cumulative.cacheReadTokens,
|
||||
@@ -522,35 +570,20 @@ export class OpenCodeManager {
|
||||
});
|
||||
}
|
||||
|
||||
console.log('[OpenCodeManager] Round', round, 'stop_reason:', data.stop_reason, 'content blocks:', JSON.stringify(data.content?.map((b: AnthropicContentBlock) => ({ type: b.type, textLen: b.text?.length, name: b.name }))));
|
||||
|
||||
if (!data.content) {
|
||||
throw new Error('API response missing content field');
|
||||
}
|
||||
|
||||
// Check if there are tool_use blocks
|
||||
const toolUseBlocks = (data.content as AnthropicContentBlock[]).filter(
|
||||
(b: AnthropicContentBlock) => b.type === 'tool_use'
|
||||
);
|
||||
|
||||
// Capture text from any block type that has a text field (text, thinking, etc.)
|
||||
const textBlocks = (data.content as AnthropicContentBlock[]).filter(
|
||||
(b: AnthropicContentBlock) => b.text
|
||||
);
|
||||
|
||||
// Accumulate and stream text content to frontend
|
||||
for (const block of textBlocks) {
|
||||
if (block.text) {
|
||||
accumulatedText += block.text;
|
||||
if (callbacks.onDelta) {
|
||||
callbacks.onDelta(block.text);
|
||||
}
|
||||
// Collect tool calls from stream accumulator
|
||||
const toolUseBlocks: Array<{ id: string; name: string; input: unknown; parseError?: string }> = [];
|
||||
for (const [, tc] of streamToolCalls) {
|
||||
try {
|
||||
toolUseBlocks.push({ id: tc.id, name: tc.name, input: JSON.parse(tc.arguments) });
|
||||
} catch (e) {
|
||||
console.error(`[OpenCodeManager] Failed to parse tool arguments for ${tc.name}:`, tc.arguments);
|
||||
toolUseBlocks.push({ id: tc.id, name: tc.name, input: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` });
|
||||
}
|
||||
}
|
||||
|
||||
console.log('[OpenCodeManager] Round', round, 'accumulatedText length:', accumulatedText.length, 'toolUseBlocks:', toolUseBlocks.length);
|
||||
console.log('[OpenCodeManager] Round', round, 'stopReason:', stopReason, 'accumulatedText length:', accumulatedText.length, 'toolCalls:', toolUseBlocks.length);
|
||||
|
||||
if (toolUseBlocks.length === 0 || data.stop_reason !== 'tool_use') {
|
||||
if (toolUseBlocks.length === 0 || stopReason !== 'tool_use') {
|
||||
// No more tool calls - return all accumulated text
|
||||
console.log('[OpenCodeManager] Returning accumulated text length:', accumulatedText.length);
|
||||
return { content: accumulatedText, toolCalls: allToolCalls };
|
||||
@@ -558,11 +591,37 @@ export class OpenCodeManager {
|
||||
|
||||
// Execute tool calls
|
||||
const toolResults: AnthropicContentBlock[] = [];
|
||||
// Build assistant content blocks for the next message round
|
||||
const assistantContentBlocks: AnthropicContentBlock[] = [];
|
||||
|
||||
// Add thinking blocks first (Anthropic requires thinking before text when extended thinking is enabled)
|
||||
for (const [, tb] of streamThinkingBlocks) {
|
||||
if (tb.text) {
|
||||
const thinkingBlock: AnthropicContentBlock = { type: 'thinking', text: tb.text };
|
||||
if (tb.signature) {
|
||||
thinkingBlock.signature = tb.signature;
|
||||
}
|
||||
assistantContentBlocks.push(thinkingBlock);
|
||||
}
|
||||
}
|
||||
|
||||
// Add text block with text from this round
|
||||
if (roundText) {
|
||||
assistantContentBlocks.push({ type: 'text', text: roundText });
|
||||
}
|
||||
|
||||
for (const toolBlock of toolUseBlocks) {
|
||||
const toolName = toolBlock.name!;
|
||||
const toolName = toolBlock.name;
|
||||
const toolArgs = toolBlock.input;
|
||||
const toolUseId = toolBlock.id!;
|
||||
const toolUseId = toolBlock.id;
|
||||
|
||||
// Add tool_use block to assistant content
|
||||
assistantContentBlocks.push({
|
||||
type: 'tool_use',
|
||||
id: toolUseId,
|
||||
name: toolName,
|
||||
input: toolArgs,
|
||||
});
|
||||
|
||||
allToolCalls.push({ name: toolName, args: toolArgs });
|
||||
|
||||
@@ -570,6 +629,21 @@ export class OpenCodeManager {
|
||||
callbacks.onToolCall({ name: toolName, args: toolArgs });
|
||||
}
|
||||
|
||||
// If JSON parsing of tool arguments failed, report the error to the model
|
||||
if (toolBlock.parseError) {
|
||||
const errorResult = { error: true, message: toolBlock.parseError };
|
||||
if (callbacks.onToolResult) {
|
||||
callbacks.onToolResult({ name: toolName, result: errorResult });
|
||||
}
|
||||
toolResults.push({
|
||||
type: 'tool_result',
|
||||
tool_use_id: toolUseId,
|
||||
content: JSON.stringify(errorResult),
|
||||
is_error: true,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this is a render tool — generate A2UI messages instead of executing
|
||||
if (isRenderTool(toolName)) {
|
||||
const a2uiMessages = generateFromToolCall(
|
||||
@@ -593,7 +667,8 @@ export class OpenCodeManager {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Execute the tool
|
||||
// Execute the tool (check abort before each tool execution)
|
||||
if (signal.aborted) break;
|
||||
const result = await this.executeTool(toolName, toolArgs as Record<string, unknown>);
|
||||
|
||||
if (callbacks.onToolResult) {
|
||||
@@ -640,10 +715,12 @@ export class OpenCodeManager {
|
||||
}
|
||||
}
|
||||
|
||||
if (signal.aborted) break;
|
||||
|
||||
// Add assistant response and tool results to messages for next round
|
||||
messages = [
|
||||
...messages,
|
||||
{ role: 'assistant' as const, content: data.content },
|
||||
{ role: 'assistant' as const, content: assistantContentBlocks },
|
||||
{ role: 'user' as const, content: toolResults },
|
||||
];
|
||||
}
|
||||
@@ -712,39 +789,77 @@ export class OpenCodeManager {
|
||||
|
||||
while (round < MAX_TOOL_ROUNDS) {
|
||||
round++;
|
||||
if (signal.aborted) break;
|
||||
|
||||
const body: Record<string, unknown> = {
|
||||
model: modelId,
|
||||
max_tokens: 4096,
|
||||
messages,
|
||||
tools: openaiTools,
|
||||
stream: true,
|
||||
stream_options: { include_usage: true },
|
||||
};
|
||||
|
||||
const response = await this.httpRequest(ZEN_OPENAI_URL, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': `Bearer ${this.apiKey}`,
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
signal,
|
||||
// Retry only the HTTP connection (429/502/503 are caught before any events are emitted).
|
||||
// Event processing is outside retry scope to prevent double-emission of onDelta on retry.
|
||||
const { events } = await withRetry(async () => {
|
||||
return httpRequestStream(ZEN_OPENAI_URL, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': `Bearer ${this.apiKey}`,
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
signal,
|
||||
});
|
||||
});
|
||||
|
||||
if (response.statusCode >= 400) {
|
||||
const errorMsg = this.parseErrorResponse(response);
|
||||
throw new Error(errorMsg);
|
||||
// Process stream events outside retry scope — onDelta is never called twice for the same text
|
||||
const streamAccumulator = createOpenAIStreamAccumulator();
|
||||
let finishReason = '';
|
||||
let promptTokens = 0;
|
||||
let completionTokens = 0;
|
||||
let totalTokens = 0;
|
||||
let cacheReadTokens = 0;
|
||||
let roundText = '';
|
||||
let receivedUsage = false;
|
||||
|
||||
try {
|
||||
for await (const event of events) {
|
||||
const result = parseOpenAIStreamEvent(event, streamAccumulator);
|
||||
|
||||
if (result.textDelta) {
|
||||
roundText += result.textDelta;
|
||||
if (callbacks.onDelta) {
|
||||
callbacks.onDelta(result.textDelta);
|
||||
}
|
||||
}
|
||||
|
||||
if (result.usage) {
|
||||
receivedUsage = true;
|
||||
if (result.usage.promptTokens !== undefined) promptTokens = result.usage.promptTokens;
|
||||
if (result.usage.completionTokens !== undefined) completionTokens = result.usage.completionTokens;
|
||||
if (result.usage.totalTokens !== undefined) totalTokens = result.usage.totalTokens;
|
||||
if (result.usage.cacheReadTokens !== undefined) cacheReadTokens = result.usage.cacheReadTokens;
|
||||
}
|
||||
|
||||
if (result.finishReason) {
|
||||
finishReason = result.finishReason;
|
||||
}
|
||||
|
||||
if (result.done) break;
|
||||
}
|
||||
} finally {
|
||||
// Preserve text already emitted via onDelta even if the stream errors mid-round
|
||||
accumulatedText += roundText;
|
||||
}
|
||||
|
||||
const data = JSON.parse(response.body);
|
||||
const choice = data.choices?.[0];
|
||||
const streamToolCalls = streamAccumulator.toolCalls;
|
||||
|
||||
// Extract and emit token usage (OpenAI format)
|
||||
if (data.usage && callbacks.onTokenUsage) {
|
||||
const usage = data.usage;
|
||||
const cacheReadTokens = usage.prompt_tokens_details?.cached_tokens || 0;
|
||||
const inputTokens = (usage.prompt_tokens || 0) - cacheReadTokens;
|
||||
const outputTokens = usage.completion_tokens || 0;
|
||||
const totalTokens = usage.total_tokens || (usage.prompt_tokens || 0) + outputTokens;
|
||||
// Emit token usage after stream completes (only when usage data was received)
|
||||
if (callbacks.onTokenUsage && receivedUsage) {
|
||||
const inputTokens = promptTokens - cacheReadTokens;
|
||||
const outputTokens = completionTokens;
|
||||
|
||||
const prev = this.conversationUsage.get(conversationId) || {
|
||||
inputTokens: 0, outputTokens: 0, cacheReadTokens: 0, cacheWriteTokens: 0,
|
||||
@@ -758,7 +873,9 @@ export class OpenCodeManager {
|
||||
this.conversationUsage.set(conversationId, cumulative);
|
||||
|
||||
callbacks.onTokenUsage({
|
||||
inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens: 0, totalTokens,
|
||||
inputTokens, outputTokens, cacheReadTokens,
|
||||
cacheWriteTokens: 0, // OpenAI streaming does not report cache write tokens
|
||||
totalTokens: totalTokens || inputTokens + outputTokens,
|
||||
cumulativeInputTokens: cumulative.inputTokens,
|
||||
cumulativeOutputTokens: cumulative.outputTokens,
|
||||
cumulativeCacheReadTokens: cumulative.cacheReadTokens,
|
||||
@@ -767,66 +884,64 @@ export class OpenCodeManager {
|
||||
});
|
||||
}
|
||||
|
||||
console.log('[OpenCodeManager:OpenAI] Round', round, 'status:', response.statusCode, 'content type:', typeof choice?.message?.content, 'content length:', choice?.message?.content?.length, 'tool_calls:', choice?.message?.tool_calls?.length);
|
||||
|
||||
if (!choice?.message) {
|
||||
throw new Error('API response missing expected message content');
|
||||
// Collect tool calls from stream accumulator
|
||||
const parsedToolCalls: Array<{ id: string; name: string; args: unknown; parseError?: string }> = [];
|
||||
for (const [, tc] of streamToolCalls) {
|
||||
try {
|
||||
parsedToolCalls.push({ id: tc.id, name: tc.name, args: JSON.parse(tc.arguments) });
|
||||
} catch (e) {
|
||||
console.error(`[OpenCodeManager:OpenAI] Failed to parse tool arguments for ${tc.name}:`, tc.arguments);
|
||||
parsedToolCalls.push({ id: tc.id, name: tc.name, args: {}, parseError: `Failed to parse tool arguments: ${(e as Error).message}` });
|
||||
}
|
||||
}
|
||||
|
||||
// Handle content that might be a string or an array of content parts
|
||||
let textContent = '';
|
||||
const content = choice.message.content;
|
||||
if (typeof content === 'string') {
|
||||
textContent = content;
|
||||
} else if (Array.isArray(content)) {
|
||||
// Handle array of content parts (some models return this format)
|
||||
// Accept any part that has a text field, regardless of type
|
||||
textContent = content
|
||||
.filter((part: { type?: string; text?: string }) => part.text)
|
||||
.map((part: { text: string }) => part.text)
|
||||
.join('');
|
||||
|
||||
// Log what types we're seeing for debugging
|
||||
const types = content.map((p: { type?: string }) => p.type).filter(Boolean);
|
||||
if (types.length > 0) {
|
||||
console.log('[OpenCodeManager:OpenAI] Content block types:', types);
|
||||
}
|
||||
} else if (content && typeof content === 'object') {
|
||||
// Handle single object with text field
|
||||
if ('text' in content && typeof content.text === 'string') {
|
||||
textContent = content.text;
|
||||
}
|
||||
}
|
||||
|
||||
if (textContent) {
|
||||
accumulatedText += textContent;
|
||||
if (callbacks.onDelta) {
|
||||
callbacks.onDelta(textContent);
|
||||
}
|
||||
}
|
||||
console.log('[OpenCodeManager:OpenAI] Round', round, 'finishReason:', finishReason, 'text length:', accumulatedText.length, 'toolCalls:', parsedToolCalls.length);
|
||||
|
||||
// If no tool calls, we're done
|
||||
if (!choice.message.tool_calls || choice.message.tool_calls.length === 0) {
|
||||
if (parsedToolCalls.length === 0 || finishReason !== 'tool_calls') {
|
||||
console.log('[OpenCodeManager:OpenAI] Done. Accumulated text length:', accumulatedText.length);
|
||||
return { content: accumulatedText, toolCalls: allToolCalls };
|
||||
}
|
||||
|
||||
// Add assistant message (with tool_calls) to conversation
|
||||
messages.push(choice.message);
|
||||
// Build the assistant message with tool_calls for conversation history
|
||||
const assistantMessage: Record<string, unknown> = {
|
||||
role: 'assistant',
|
||||
content: roundText || null,
|
||||
tool_calls: parsedToolCalls.map((tc) => ({
|
||||
id: tc.id,
|
||||
type: 'function',
|
||||
function: { name: tc.name, arguments: JSON.stringify(tc.args) },
|
||||
})),
|
||||
};
|
||||
messages.push(assistantMessage);
|
||||
|
||||
// Execute tool calls and add results
|
||||
for (const toolCall of choice.message.tool_calls) {
|
||||
const toolName = toolCall.function.name;
|
||||
const toolArgs = JSON.parse(toolCall.function.arguments || '{}');
|
||||
for (const toolCall of parsedToolCalls) {
|
||||
const toolName = toolCall.name;
|
||||
const toolArgs = toolCall.args;
|
||||
|
||||
allToolCalls.push({ name: toolName, args: toolArgs });
|
||||
if (callbacks.onToolCall) {
|
||||
callbacks.onToolCall({ name: toolName, args: toolArgs });
|
||||
}
|
||||
|
||||
// If JSON parsing of tool arguments failed, report the error to the model
|
||||
if (toolCall.parseError) {
|
||||
const errorResult = { error: true, message: toolCall.parseError };
|
||||
if (callbacks.onToolResult) {
|
||||
callbacks.onToolResult({ name: toolName, result: errorResult });
|
||||
}
|
||||
messages.push({
|
||||
role: 'tool',
|
||||
content: JSON.stringify(errorResult),
|
||||
tool_call_id: toolCall.id,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this is a render tool
|
||||
if (isRenderTool(toolName)) {
|
||||
const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs);
|
||||
const a2uiMessages = generateFromToolCall(conversationId, toolName, toolArgs as Record<string, unknown>);
|
||||
if (a2uiMessages) {
|
||||
emitA2UIMessages(a2uiMessages);
|
||||
}
|
||||
@@ -843,7 +958,9 @@ export class OpenCodeManager {
|
||||
continue;
|
||||
}
|
||||
|
||||
const result = await this.executeTool(toolName, toolArgs);
|
||||
// Check abort before each tool execution
|
||||
if (signal.aborted) break;
|
||||
const result = await this.executeTool(toolName, toolArgs as Record<string, unknown>);
|
||||
|
||||
if (callbacks.onToolResult) {
|
||||
callbacks.onToolResult({ name: toolName, result });
|
||||
@@ -855,6 +972,8 @@ export class OpenCodeManager {
|
||||
tool_call_id: toolCall.id,
|
||||
});
|
||||
}
|
||||
|
||||
if (signal.aborted) break;
|
||||
}
|
||||
|
||||
// Hit max rounds
|
||||
@@ -2251,7 +2370,7 @@ Respond with JSON only: {"title": "...", "alt": "...", "caption": "..."}`;
|
||||
options.signal.addEventListener('abort', () => {
|
||||
req.destroy();
|
||||
reject(new Error('Request cancelled'));
|
||||
});
|
||||
}, { once: true });
|
||||
}
|
||||
|
||||
if (options.body) {
|
||||
|
||||
620
src/main/engine/streaming.ts
Normal file
620
src/main/engine/streaming.ts
Normal file
@@ -0,0 +1,620 @@
|
||||
/**
|
||||
* SSE Streaming Infrastructure
|
||||
*
|
||||
* Provides SSE line parsing, event parsers for OpenAI/Mistral and Anthropic
|
||||
* stream formats, tool-call accumulation, and retry-with-exponential-backoff.
|
||||
*
|
||||
* Used by OpenCodeManager to convert buffered HTTP calls to real-time
|
||||
* token-by-token streaming for all chat providers.
|
||||
*/
|
||||
|
||||
import https from 'https';
|
||||
import http from 'http';
|
||||
import { URL } from 'url';
|
||||
|
||||
// ── Types ──
|
||||
|
||||
export interface SSEEvent {
|
||||
event?: string;
|
||||
data: string;
|
||||
}
|
||||
|
||||
export interface StreamEventResult {
|
||||
/** Text content delta to emit to UI */
|
||||
textDelta?: string;
|
||||
/** Whether the stream is complete */
|
||||
done: boolean;
|
||||
/** Finish reason from the model */
|
||||
finishReason?: string;
|
||||
/** Token usage information */
|
||||
usage?: {
|
||||
promptTokens?: number;
|
||||
completionTokens?: number;
|
||||
totalTokens?: number;
|
||||
inputTokens?: number;
|
||||
outputTokens?: number;
|
||||
cacheReadTokens?: number;
|
||||
cacheWriteTokens?: number;
|
||||
};
|
||||
}
|
||||
|
||||
interface ToolCallAccumulator {
|
||||
id: string;
|
||||
name: string;
|
||||
arguments: string;
|
||||
}
|
||||
|
||||
export interface OpenAIStreamAccumulator {
|
||||
toolCalls: Map<number, ToolCallAccumulator>;
|
||||
}
|
||||
|
||||
export interface AnthropicStreamAccumulator {
|
||||
toolCalls: Map<number, ToolCallAccumulator>;
|
||||
thinkingBlocks: Map<number, { text: string; signature?: string }>;
|
||||
}
|
||||
|
||||
export interface HttpStreamError extends Error {
|
||||
statusCode?: number;
|
||||
retryAfter?: number;
|
||||
isAbort?: boolean;
|
||||
}
|
||||
|
||||
// ── SSE Line Parsing ──
|
||||
|
||||
/**
|
||||
* Parse raw SSE text into structured events.
|
||||
*
|
||||
* SSE protocol: events are separated by double-newlines (\n\n).
|
||||
* Each event can have `event:` and `data:` lines.
|
||||
* Multiple `data:` lines within one event are concatenated with newlines.
|
||||
* Lines starting with `:` are comments (ignored).
|
||||
*
|
||||
* Returns parsed events and any remaining incomplete text (buffer).
|
||||
*/
|
||||
export function parseSSELines(text: string): { events: SSEEvent[]; remaining: string } {
|
||||
const events: SSEEvent[] = [];
|
||||
|
||||
// Normalize \r\n to \n
|
||||
const normalized = text.replace(/\r\n/g, '\n');
|
||||
|
||||
// Split on double-newline (event boundary)
|
||||
const parts = normalized.split('\n\n');
|
||||
|
||||
// Last part may be incomplete (no trailing \n\n)
|
||||
const remaining = normalized.endsWith('\n\n') ? '' : parts.pop() || '';
|
||||
|
||||
for (const part of parts) {
|
||||
if (!part.trim()) continue;
|
||||
|
||||
let eventType: string | undefined;
|
||||
const dataLines: string[] = [];
|
||||
|
||||
for (const line of part.split('\n')) {
|
||||
// Comment lines start with ':'
|
||||
if (line.startsWith(':')) continue;
|
||||
|
||||
if (line.startsWith('event: ') || line.startsWith('event:')) {
|
||||
const afterColon = line.slice(line.indexOf(':') + 1);
|
||||
eventType = afterColon.startsWith(' ') ? afterColon.slice(1) : afterColon;
|
||||
} else if (line.startsWith('data: ') || line.startsWith('data:')) {
|
||||
const afterColon = line.slice(line.indexOf(':') + 1);
|
||||
dataLines.push(afterColon.startsWith(' ') ? afterColon.slice(1) : afterColon);
|
||||
}
|
||||
}
|
||||
|
||||
if (dataLines.length > 0) {
|
||||
events.push({
|
||||
event: eventType,
|
||||
data: dataLines.join('\n'),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return { events, remaining };
|
||||
}
|
||||
|
||||
// ── Accumulator Factories ──
|
||||
|
||||
export function createOpenAIStreamAccumulator(): OpenAIStreamAccumulator {
|
||||
return { toolCalls: new Map() };
|
||||
}
|
||||
|
||||
export function createAnthropicStreamAccumulator(): AnthropicStreamAccumulator {
|
||||
return { toolCalls: new Map(), thinkingBlocks: new Map() };
|
||||
}
|
||||
|
||||
// ── OpenAI/Mistral SSE Parser ──
|
||||
|
||||
/**
|
||||
* Parse a single OpenAI/Mistral SSE event and update the accumulator.
|
||||
*
|
||||
* OpenAI streaming format:
|
||||
* - Text deltas: choices[0].delta.content
|
||||
* - Tool call start: delta.tool_calls[i] with id + function.name
|
||||
* - Tool call fragments: delta.tool_calls[i].function.arguments (append)
|
||||
* - Finish reason: choices[0].finish_reason
|
||||
* - Usage: usage object in final chunk (requires stream_options.include_usage)
|
||||
* - [DONE] sentinel: stop iteration
|
||||
*/
|
||||
export function parseOpenAIStreamEvent(
|
||||
event: SSEEvent,
|
||||
accumulator: OpenAIStreamAccumulator,
|
||||
): StreamEventResult {
|
||||
// Handle [DONE] sentinel
|
||||
if (event.data === '[DONE]') {
|
||||
return { done: true };
|
||||
}
|
||||
|
||||
let data: Record<string, unknown>;
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
data = JSON.parse(event.data) as any;
|
||||
} catch {
|
||||
// Skip corrupted SSE events (e.g. partial JSON from TCP split)
|
||||
return { done: false };
|
||||
}
|
||||
const choice = (data as any).choices?.[0];
|
||||
const result: StreamEventResult = { done: false };
|
||||
|
||||
if (choice) {
|
||||
const delta = choice.delta;
|
||||
|
||||
// Text content delta
|
||||
if (delta?.content && delta.content.length > 0) {
|
||||
result.textDelta = delta.content;
|
||||
}
|
||||
|
||||
// Tool calls
|
||||
if (delta?.tool_calls) {
|
||||
for (const tc of delta.tool_calls) {
|
||||
const idx = tc.index;
|
||||
const existing = accumulator.toolCalls.get(idx);
|
||||
|
||||
if (tc.id || tc.function?.name) {
|
||||
// New tool call or update
|
||||
if (!existing) {
|
||||
accumulator.toolCalls.set(idx, {
|
||||
id: tc.id || '',
|
||||
name: tc.function?.name || '',
|
||||
arguments: tc.function?.arguments || '',
|
||||
});
|
||||
} else {
|
||||
if (tc.id) existing.id = tc.id;
|
||||
if (tc.function?.name) existing.name = tc.function.name;
|
||||
if (tc.function?.arguments) existing.arguments += tc.function.arguments;
|
||||
}
|
||||
} else if (existing && tc.function?.arguments) {
|
||||
// Append argument fragment
|
||||
existing.arguments += tc.function.arguments;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Finish reason
|
||||
if (choice.finish_reason) {
|
||||
result.finishReason = choice.finish_reason;
|
||||
}
|
||||
}
|
||||
|
||||
// Token usage (arrives in final chunk with stream_options.include_usage)
|
||||
if ((data as any).usage) {
|
||||
const usage = (data as any).usage;
|
||||
const promptDetails = usage.prompt_tokens_details;
|
||||
result.usage = {
|
||||
promptTokens: usage.prompt_tokens,
|
||||
completionTokens: usage.completion_tokens,
|
||||
totalTokens: usage.total_tokens,
|
||||
cacheReadTokens: promptDetails?.cached_tokens,
|
||||
};
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// ── Anthropic SSE Parser ──
|
||||
|
||||
/**
|
||||
* Parse a single Anthropic SSE event and update the accumulator.
|
||||
*
|
||||
* Anthropic streaming format uses named event types:
|
||||
* - message_start: input token usage
|
||||
* - content_block_start: text, tool_use, or thinking block begins
|
||||
* - content_block_delta: text_delta, input_json_delta, or thinking_delta
|
||||
* - content_block_stop: block ends
|
||||
* - message_delta: output tokens + stop_reason
|
||||
* - message_stop: stream complete
|
||||
* - ping: keep-alive (ignored)
|
||||
* - error: server error mid-stream
|
||||
*/
|
||||
export function parseAnthropicStreamEvent(
|
||||
event: SSEEvent,
|
||||
accumulator: AnthropicStreamAccumulator,
|
||||
): StreamEventResult {
|
||||
let data: Record<string, unknown>;
|
||||
try {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
data = JSON.parse(event.data) as any;
|
||||
} catch {
|
||||
// Skip corrupted SSE events (e.g. partial JSON from TCP split)
|
||||
return { done: false };
|
||||
}
|
||||
const result: StreamEventResult = { done: false };
|
||||
|
||||
switch (event.event) {
|
||||
case 'message_start': {
|
||||
const usage = (data as any).message?.usage;
|
||||
if (usage) {
|
||||
result.usage = {
|
||||
inputTokens: usage.input_tokens || 0,
|
||||
cacheReadTokens: usage.cache_read_input_tokens || 0,
|
||||
cacheWriteTokens: usage.cache_creation_input_tokens || 0,
|
||||
};
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'content_block_start': {
|
||||
const block = (data as any).content_block;
|
||||
if (block?.type === 'tool_use') {
|
||||
accumulator.toolCalls.set(data.index as number, {
|
||||
id: block.id,
|
||||
name: block.name,
|
||||
arguments: '',
|
||||
});
|
||||
} else if (block?.type === 'thinking') {
|
||||
accumulator.thinkingBlocks.set(data.index as number, { text: '' });
|
||||
}
|
||||
// text block start is a no-op (empty initial text)
|
||||
break;
|
||||
}
|
||||
|
||||
case 'content_block_delta': {
|
||||
const delta = (data as any).delta;
|
||||
if (delta?.type === 'text_delta' && delta.text) {
|
||||
result.textDelta = delta.text;
|
||||
} else if (delta?.type === 'input_json_delta' && delta.partial_json) {
|
||||
const tc = accumulator.toolCalls.get(data.index as number);
|
||||
if (tc) {
|
||||
tc.arguments += delta.partial_json;
|
||||
}
|
||||
} else if (delta?.type === 'thinking_delta' && delta.thinking) {
|
||||
const tb = accumulator.thinkingBlocks.get(data.index as number);
|
||||
if (tb) {
|
||||
tb.text += delta.thinking;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'content_block_stop': {
|
||||
// Block is complete. Tool arguments can now be parsed by the caller.
|
||||
// For thinking blocks, capture the signature (required by Anthropic when replaying thinking blocks).
|
||||
const stopBlock = (data as any).content_block;
|
||||
if (stopBlock?.type === 'thinking' && stopBlock.signature) {
|
||||
const tb = accumulator.thinkingBlocks.get(data.index as number);
|
||||
if (tb) {
|
||||
tb.signature = stopBlock.signature;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'message_delta': {
|
||||
if ((data as any).usage) {
|
||||
result.usage = {
|
||||
outputTokens: (data as any).usage.output_tokens || 0,
|
||||
};
|
||||
}
|
||||
if ((data as any).delta?.stop_reason) {
|
||||
result.finishReason = (data as any).delta.stop_reason;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'message_stop':
|
||||
result.done = true;
|
||||
break;
|
||||
|
||||
case 'ping':
|
||||
// Keep-alive, ignore
|
||||
break;
|
||||
|
||||
case 'error': {
|
||||
const errorMsg = (data as any).error?.message || 'Unknown streaming error';
|
||||
throw new Error(errorMsg);
|
||||
}
|
||||
|
||||
default:
|
||||
// Unknown event type, ignore
|
||||
break;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// ── Retry with Exponential Backoff ──
|
||||
|
||||
const RETRYABLE_STATUS_CODES = new Set([429, 502, 503]);
|
||||
|
||||
/**
|
||||
* Retry a function with exponential backoff for transient HTTP errors.
|
||||
*
|
||||
* Retries on 429 (rate limit), 502 (bad gateway), 503 (service unavailable).
|
||||
* Also retries errors without a statusCode (e.g. ECONNRESET, EPIPE) since
|
||||
* these indicate transient network failures during connection.
|
||||
*
|
||||
* Does NOT retry on other 4xx errors (400, 401, 403 — client errors) or abort.
|
||||
* Respects Retry-After header for 429 responses.
|
||||
*
|
||||
* Best practice: wrap only the HTTP connection (httpRequestStream) in withRetry,
|
||||
* NOT the event processing loop. This ensures onDelta callbacks are never
|
||||
* called twice for the same text on retry.
|
||||
*/
|
||||
export async function withRetry<T>(
|
||||
fn: () => Promise<T>,
|
||||
options: { maxRetries?: number; onRetry?: (attempt: number, error: Error) => void; signal?: AbortSignal } = {},
|
||||
): Promise<T> {
|
||||
const maxRetries = options.maxRetries ?? 3;
|
||||
let lastError: Error | undefined;
|
||||
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (error) {
|
||||
lastError = error as Error;
|
||||
const httpError = error as HttpStreamError;
|
||||
|
||||
// Don't retry on abort
|
||||
if (httpError.isAbort || httpError.message === 'Request cancelled') {
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Check signal before retrying
|
||||
if (options.signal?.aborted) {
|
||||
const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError;
|
||||
abortError.isAbort = true;
|
||||
throw abortError;
|
||||
}
|
||||
|
||||
// Don't retry on non-retryable status codes
|
||||
if (httpError.statusCode && !RETRYABLE_STATUS_CODES.has(httpError.statusCode)) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Don't retry if we've exhausted retries
|
||||
if (attempt >= maxRetries) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Calculate delay with exponential backoff and jitter
|
||||
const baseDelay = Math.pow(2, attempt) * 1000; // 1s, 2s, 4s
|
||||
const jitter = Math.random() * 500;
|
||||
let delay = baseDelay + jitter;
|
||||
|
||||
// Respect Retry-After header for 429
|
||||
if (httpError.retryAfter && httpError.retryAfter > 0) {
|
||||
delay = Math.max(delay, httpError.retryAfter * 1000);
|
||||
}
|
||||
|
||||
if (options.onRetry) {
|
||||
options.onRetry(attempt + 1, lastError);
|
||||
}
|
||||
|
||||
// Abort-aware delay: reject immediately if signal fires during wait
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const timer = setTimeout(resolve, delay);
|
||||
if (options.signal) {
|
||||
const onAbort = () => {
|
||||
clearTimeout(timer);
|
||||
const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError;
|
||||
abortError.isAbort = true;
|
||||
reject(abortError);
|
||||
};
|
||||
if (options.signal.aborted) {
|
||||
clearTimeout(timer);
|
||||
const abortError: HttpStreamError = new Error('Request cancelled') as HttpStreamError;
|
||||
abortError.isAbort = true;
|
||||
reject(abortError);
|
||||
return;
|
||||
}
|
||||
options.signal.addEventListener('abort', onAbort, { once: true });
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
// ── HTTP Streaming Request ──
|
||||
|
||||
interface HttpStreamOptions {
|
||||
method?: string;
|
||||
headers?: Record<string, string>;
|
||||
body?: string;
|
||||
signal?: AbortSignal;
|
||||
timeout?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make an HTTP request that returns an async iterable of SSE events.
|
||||
*
|
||||
* Uses Node.js http/https modules directly, reading the response
|
||||
* as a readable stream and parsing SSE events incrementally.
|
||||
*
|
||||
* On non-2xx status: collects the error body and throws.
|
||||
* Supports AbortSignal for cancellation.
|
||||
*/
|
||||
export function httpRequestStream(
|
||||
urlStr: string,
|
||||
options: HttpStreamOptions,
|
||||
): Promise<{
|
||||
statusCode: number;
|
||||
events: AsyncIterable<SSEEvent>;
|
||||
}> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const url = new URL(urlStr);
|
||||
const protocol = url.protocol === 'https:' ? https : http;
|
||||
const timeout = options.timeout ?? 120000;
|
||||
|
||||
const req = protocol.request(url, {
|
||||
method: options.method || 'POST',
|
||||
headers: options.headers || {},
|
||||
timeout,
|
||||
}, (res) => {
|
||||
const statusCode = res.statusCode || 0;
|
||||
|
||||
// Non-2xx: collect error body and throw
|
||||
if (statusCode < 200 || statusCode >= 300) {
|
||||
let errorBody = '';
|
||||
res.on('data', (chunk: Buffer) => { errorBody += chunk; });
|
||||
res.on('end', () => {
|
||||
const error: HttpStreamError = new Error(`API error: ${statusCode}`) as HttpStreamError;
|
||||
error.statusCode = statusCode;
|
||||
|
||||
// Parse Retry-After for 429
|
||||
if (statusCode === 429) {
|
||||
const retryAfter = res.headers['retry-after'];
|
||||
if (retryAfter) {
|
||||
const seconds = parseInt(retryAfter, 10);
|
||||
if (!isNaN(seconds)) {
|
||||
error.retryAfter = seconds;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try to extract a better error message
|
||||
try {
|
||||
const parsed = JSON.parse(errorBody);
|
||||
error.message = parsed.error?.message || parsed.message || error.message;
|
||||
} catch {
|
||||
if (errorBody.length > 0) {
|
||||
error.message = `${error.message}: ${errorBody.slice(0, 200)}`;
|
||||
}
|
||||
}
|
||||
reject(error);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// 2xx: create async iterable of SSE events
|
||||
const events: AsyncIterable<SSEEvent> = {
|
||||
[Symbol.asyncIterator]() {
|
||||
let buffer = '';
|
||||
let done = false;
|
||||
let pendingError: Error | null = null;
|
||||
const eventQueue: SSEEvent[] = [];
|
||||
let resolveNext: ((value: IteratorResult<SSEEvent>) => void) | null = null;
|
||||
let rejectNext: ((error: Error) => void) | null = null;
|
||||
|
||||
res.on('data', (chunk: Buffer) => {
|
||||
buffer += chunk.toString('utf-8');
|
||||
const { events: parsed, remaining } = parseSSELines(buffer);
|
||||
buffer = remaining;
|
||||
|
||||
for (const event of parsed) {
|
||||
if (resolveNext) {
|
||||
const resolve = resolveNext;
|
||||
resolveNext = null;
|
||||
rejectNext = null;
|
||||
resolve({ value: event, done: false });
|
||||
} else {
|
||||
eventQueue.push(event);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
res.on('end', () => {
|
||||
done = true;
|
||||
if (resolveNext) {
|
||||
const resolve = resolveNext;
|
||||
resolveNext = null;
|
||||
rejectNext = null;
|
||||
resolve({ value: undefined as unknown as SSEEvent, done: true });
|
||||
}
|
||||
});
|
||||
|
||||
res.on('error', (err: Error) => {
|
||||
done = true;
|
||||
if (rejectNext) {
|
||||
const reject = rejectNext;
|
||||
resolveNext = null;
|
||||
rejectNext = null;
|
||||
reject(err);
|
||||
} else {
|
||||
// Store error for next .next() call so it's not silently swallowed
|
||||
pendingError = err;
|
||||
}
|
||||
});
|
||||
|
||||
return {
|
||||
next(): Promise<IteratorResult<SSEEvent>> {
|
||||
// Return queued event immediately
|
||||
if (eventQueue.length > 0) {
|
||||
return Promise.resolve({ value: eventQueue.shift()!, done: false });
|
||||
}
|
||||
|
||||
// Throw stored error from a previous event that fired with no consumer waiting
|
||||
if (pendingError) {
|
||||
const err = pendingError;
|
||||
pendingError = null;
|
||||
return Promise.reject(err);
|
||||
}
|
||||
|
||||
// Stream already ended
|
||||
if (done) {
|
||||
return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true });
|
||||
}
|
||||
|
||||
// Wait for next event
|
||||
return new Promise<IteratorResult<SSEEvent>>((resolve, reject) => {
|
||||
resolveNext = resolve;
|
||||
rejectNext = reject;
|
||||
});
|
||||
},
|
||||
return(): Promise<IteratorResult<SSEEvent>> {
|
||||
// Called when for-await-of exits early (break, return, throw).
|
||||
// Destroy the response stream to free the socket immediately.
|
||||
done = true;
|
||||
res.destroy();
|
||||
return Promise.resolve({ value: undefined as unknown as SSEEvent, done: true });
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
resolve({ statusCode, events });
|
||||
});
|
||||
|
||||
req.on('error', (err: Error) => {
|
||||
const error: HttpStreamError = err as HttpStreamError;
|
||||
if (options.signal?.aborted) {
|
||||
error.isAbort = true;
|
||||
}
|
||||
reject(error);
|
||||
});
|
||||
|
||||
req.on('timeout', () => {
|
||||
req.destroy();
|
||||
reject(new Error('Request timed out'));
|
||||
});
|
||||
|
||||
if (options.signal) {
|
||||
if (options.signal.aborted) {
|
||||
req.destroy();
|
||||
const error: HttpStreamError = new Error('Request cancelled') as HttpStreamError;
|
||||
error.isAbort = true;
|
||||
reject(error);
|
||||
return;
|
||||
}
|
||||
options.signal.addEventListener('abort', () => {
|
||||
req.destroy();
|
||||
}, { once: true });
|
||||
}
|
||||
|
||||
if (options.body) {
|
||||
req.write(options.body);
|
||||
}
|
||||
req.end();
|
||||
});
|
||||
}
|
||||
1598
tests/engine/streaming.test.ts
Normal file
1598
tests/engine/streaming.test.ts
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user