fix: implemented TD-06 real SSE implementation

This commit is contained in:
2026-06-11 16:37:08 +02:00
parent a5391e8e25
commit 9325de2db4
9 changed files with 991 additions and 15 deletions

View File

@@ -238,10 +238,23 @@ batching speedup.
## Phase 2 — Unbounded blocking & cancellation ## Phase 2 — Unbounded blocking & cancellation
### TD-06: Real SSE streaming for chat ### TD-06: Real SSE streaming for chat ✅ DONE (2026-06-11)
**Depends on TD-02 (Req).** **Depends on TD-02 (Req).**
**Status: implemented.** Chat requests now send `"stream": true` (+
`stream_options.include_usage`) and consume the SSE response incrementally
via `HttpClient.post_stream/5` (Req `into:`). `BDS.AI.SSE` assembles content
deltas, tool-call fragments, and usage, emitting **cumulative content
snapshots** throttled to `stream_emit_interval_ms` (default 100ms) — replace
semantics, so the chat editor needed no changes and tool rounds reset
naturally. Streaming applies only to `operation: :chat` with an `:on_stream`
callback, can be disabled via `config :bds, :chat, streaming: false`, and
providers that ignore the stream flag are auto-detected by content-type and
parsed as plain JSON. Cancellation kills the chat task, which aborts the
underlying connection (server-observed in tests). Persistence semantics are
unchanged (one assistant row per round, same usage normalization).
**Context.** `OpenAICompatibleRuntime.generate/3` never sets `"stream": true`; **Context.** `OpenAICompatibleRuntime.generate/3` never sets `"stream": true`;
the UI's `{:chat_streaming_content, ...}` event fires exactly once with the the UI's `{:chat_streaming_content, ...}` event fires exactly once with the
complete response, i.e. streaming is fake. For local models this is the complete response, i.e. streaming is fake. For local models this is the

View File

@@ -67,7 +67,13 @@ config :bds, :scripting,
transform_max_toasts_total: 20, transform_max_toasts_total: 20,
transform_max_toast_length: 300 transform_max_toast_length: 300
config :bds, :chat, max_tool_rounds: 10 # streaming: chat completions use SSE when the provider supports it (set to
# false for OpenAI-compatible servers that reject the "stream" flag).
# stream_emit_interval_ms throttles how often streamed content reaches the UI.
config :bds, :chat,
max_tool_rounds: 10,
streaming: true,
stream_emit_interval_ms: 100
config :bds, :embeddings, config :bds, :embeddings,
backend: BDS.Embeddings.Backends.Neural, backend: BDS.Embeddings.Backends.Neural,

View File

@@ -565,9 +565,10 @@ defmodule BDS.AI.Chat do
rounds_left rounds_left
) do ) do
request = build_chat_request(conversation, messages, model, project_id, tools) request = build_chat_request(conversation, messages, model, project_id, tools)
generate_opts = put_stream_callback(opts, conversation.id)
with {:ok, response} <- with {:ok, response} <-
runtime.generate(Runtime.endpoint_with_model(endpoint, model), request, opts), runtime.generate(Runtime.endpoint_with_model(endpoint, model), request, generate_opts),
{:ok, assistant_message} <- persist_assistant_response(conversation.id, response), {:ok, assistant_message} <- persist_assistant_response(conversation.id, response),
:ok <- touch_conversation(conversation.id) do :ok <- touch_conversation(conversation.id) do
if is_binary(Map.get(response, :content)) and String.trim(Map.get(response, :content)) != "" do if is_binary(Map.get(response, :content)) and String.trim(Map.get(response, :content)) != "" do
@@ -921,6 +922,26 @@ defmodule BDS.AI.Chat do
end end
end end
# When someone is listening for chat events, ask the runtime to stream:
# it emits cumulative content snapshots, which the editor renders with
# replace semantics. The full-content notify after each round stays the
# authoritative final state (and the only event for non-streaming runtimes).
defp put_stream_callback(opts, conversation_id) do
case Keyword.get(opts, :event_target) do
nil ->
opts
_target ->
Keyword.put(opts, :on_stream, fn %{content: content} ->
if is_binary(content) and String.trim(content) != "" do
notify_chat_event(opts, {:chat_streaming_content, conversation_id, content})
end
:ok
end)
end
end
defp notify_chat_event(opts, event) do defp notify_chat_event(opts, event) do
case Keyword.get(opts, :event_target) do case Keyword.get(opts, :event_target) do
pid when is_pid(pid) -> send(pid, event) pid when is_pid(pid) -> send(pid, event)

View File

@@ -59,6 +59,62 @@ defmodule BDS.AI.HttpClient do
|> normalize_result() |> normalize_result()
end end
@doc """
Streaming POST: body chunks of a 200 response are folded into `acc` via
`reducer.(chunk, acc)` as they arrive; non-200 bodies are collected whole
for error reporting. Returns the final accumulator alongside the response.
Never retried (same reasoning as `post/3`), and `accept-encoding` is
disabled so event-stream chunks arrive uncompressed. The request runs in
the calling process — killing that process aborts the underlying
connection, which is what makes mid-stream chat cancellation work.
"""
@spec post_stream(String.t(), %{String.t() => String.t()}, binary(), acc, (binary(), acc ->
acc)) ::
{:ok, %{status: non_neg_integer(), headers: map(), body: binary()}, acc}
| {:error, term()}
when acc: term()
def post_stream(url, headers, body, acc, reducer)
when is_binary(url) and is_map(headers) and is_binary(body) and is_function(reducer, 2) do
into = fn {:data, data}, {req, resp} ->
resp =
if resp.status == 200 do
next_acc = reducer.(data, Req.Response.get_private(resp, :bds_stream_acc, acc))
Req.Response.put_private(resp, :bds_stream_acc, next_acc)
else
%{resp | body: collected_body(resp.body) <> data}
end
{:cont, {req, resp}}
end
[
method: :post,
url: url,
headers: headers,
body: body,
retry: false,
compressed: false,
into: into
]
|> Keyword.merge(base_options())
|> Req.request()
|> case do
{:ok, %Req.Response{} = resp} ->
{:ok, %{status: resp.status, headers: normalize_headers(resp.headers), body: collected_body(resp.body)},
Req.Response.get_private(resp, :bds_stream_acc, acc)}
{:error, %Req.TransportError{reason: reason}} ->
{:error, reason}
{:error, reason} ->
{:error, reason}
end
end
defp collected_body(body) when is_binary(body), do: body
defp collected_body(_body), do: ""
defp base_options do defp base_options do
[ [
connect_options: [timeout: config(:connect_timeout_ms, @default_connect_timeout_ms)], connect_options: [timeout: config(:connect_timeout_ms, @default_connect_timeout_ms)],

View File

@@ -4,6 +4,7 @@ defmodule BDS.AI.OpenAICompatibleRuntime do
require Logger require Logger
alias BDS.AI.HttpClient alias BDS.AI.HttpClient
alias BDS.AI.SSE
def list_models(endpoint, opts \\ []) when is_map(endpoint) and is_list(opts) do def list_models(endpoint, opts \\ []) when is_map(endpoint) and is_list(opts) do
http_client = Keyword.get(opts, :http_client, HttpClient) http_client = Keyword.get(opts, :http_client, HttpClient)
@@ -22,7 +23,7 @@ defmodule BDS.AI.OpenAICompatibleRuntime do
end end
end end
def generate(endpoint, request, _opts) when is_map(endpoint) and is_map(request) do def generate(endpoint, request, opts) when is_map(endpoint) and is_map(request) do
url = completions_url(endpoint.url) url = completions_url(endpoint.url)
headers = headers =
@@ -41,6 +42,14 @@ defmodule BDS.AI.OpenAICompatibleRuntime do
|> maybe_disable_thinking(request.model) |> maybe_disable_thinking(request.model)
|> maybe_put_tools(Map.get(request, :tools, [])) |> maybe_put_tools(Map.get(request, :tools, []))
if stream?(request, opts) do
generate_streaming(url, headers, payload, request, Keyword.fetch!(opts, :on_stream))
else
generate_blocking(url, headers, payload, request)
end
end
defp generate_blocking(url, headers, payload, request) do
payload_json = Jason.encode!(payload) payload_json = Jason.encode!(payload)
Logger.debug( Logger.debug(
@@ -81,6 +90,81 @@ defmodule BDS.AI.OpenAICompatibleRuntime do
end end
end end
# Streaming variant: same request payload plus stream flags; SSE chunks are
# folded into a BDS.AI.SSE assembler that emits cumulative content
# snapshots to `on_stream` as they arrive. The assembled message goes
# through the same normalization as the blocking path.
defp generate_streaming(url, headers, payload, request, on_stream) do
payload_json =
payload
|> Map.put("stream", true)
|> Map.put("stream_options", %{"include_usage" => true})
|> Jason.encode!()
Logger.debug(
"AI OpenAI-compatible streaming request operation=#{inspect(Map.get(request, :operation))} model=#{inspect(request.model)} url=#{url} payload_size=#{byte_size(payload_json)}"
)
sse = SSE.new(on_stream, emit_interval_ms: stream_emit_interval_ms())
case HttpClient.post_stream(url, headers, payload_json, sse, fn chunk, acc ->
SSE.feed(acc, chunk)
end) do
{:ok, %{status: 200, headers: response_headers}, sse} ->
if event_stream?(response_headers) do
assembled = SSE.finish(sse)
{:ok,
%{
content: assembled.content,
json: decode_json_content(assembled.content),
tool_calls: normalize_tool_calls(assembled.tool_calls),
usage: normalize_usage(assembled.usage || %{})
}}
else
# The provider ignored the stream flag and sent a plain completion.
normalize_response(SSE.raw_body(sse))
end
{:ok, %{status: status, body: body}, _sse} ->
Logger.error(
"AI OpenAI-compatible streaming HTTP error status=#{status} body=#{String.slice(body, 0, 2000)}"
)
{:error, %{kind: :http_error, status: status, body: body}}
{:error, reason} ->
Logger.error("AI OpenAI-compatible streaming request failed: #{inspect(reason)}")
{:error, %{kind: :http_error, reason: reason}}
end
end
# Streaming is opt-in per request (the caller passes :on_stream), limited
# to interactive chat, and can be disabled globally for providers that do
# not support SSE (config :bds, :chat, streaming: false).
defp stream?(request, opts) do
Map.get(request, :operation) == :chat and
is_function(Keyword.get(opts, :on_stream), 1) and
chat_config(:streaming, true)
end
defp stream_emit_interval_ms, do: chat_config(:stream_emit_interval_ms, 100)
defp event_stream?(headers) do
case headers["content-type"] do
content_type when is_binary(content_type) ->
String.contains?(content_type, "text/event-stream")
_missing ->
# No content type: trust the request we made and parse as SSE.
true
end
end
defp chat_config(key, default) do
:bds |> Application.get_env(:chat, []) |> Keyword.get(key, default)
end
defp normalize_response(body) do defp normalize_response(body) do
with {:ok, payload} <- decode_json_body(body) do with {:ok, payload} <- decode_json_body(body) do
message = get_in(payload, ["choices", Access.at(0), "message"]) || %{} message = get_in(payload, ["choices", Access.at(0), "message"]) || %{}
@@ -88,22 +172,25 @@ defmodule BDS.AI.OpenAICompatibleRuntime do
tool_calls = normalize_tool_calls(message["tool_calls"] || []) tool_calls = normalize_tool_calls(message["tool_calls"] || [])
usage = normalize_usage(payload["usage"] || %{}) usage = normalize_usage(payload["usage"] || %{})
json = {:ok,
case content do %{
nil -> content: content,
nil json: decode_json_content(content),
tool_calls: tool_calls,
usage: usage
}}
end
end
value when is_binary(value) -> defp decode_json_content(nil), do: nil
case Jason.decode(value) do
defp decode_json_content(content) when is_binary(content) do
case Jason.decode(content) do
{:ok, decoded} when is_map(decoded) -> decoded {:ok, decoded} when is_map(decoded) -> decoded
_other -> nil _other -> nil
end end
end end
{:ok, %{content: content, json: json, tool_calls: tool_calls, usage: usage}}
end
end
defp completions_url(url) do defp completions_url(url) do
cond do cond do
String.ends_with?(url, "/chat/completions") -> url String.ends_with?(url, "/chat/completions") -> url

176
lib/bds/ai/sse.ex Normal file
View File

@@ -0,0 +1,176 @@
defmodule BDS.AI.SSE do
@moduledoc """
Incremental assembler for OpenAI-compatible `text/event-stream` chat
completions.
Fed raw transport chunks via `feed/2`, it buffers partial events, decodes
`data:` payloads, and accumulates content deltas, tool-call fragments, and
usage. Content is reported to the optional `on_event` callback as
**cumulative snapshots** (`%{content: binary}`) — replace semantics, which
matches how the chat editor renders streaming state and resets naturally
between tool rounds. Emissions are throttled to `:emit_interval_ms`
(the first delta always emits immediately for perceived latency).
`finish/1` returns the assembled message in OpenAI wire shape so the
runtime can reuse its non-streaming normalization:
`%{content: binary | nil, tool_calls: [%{"id" => _, "function" => %{"name" => _, "arguments" => json_string}}], usage: map | nil}`.
"""
defstruct buffer: "",
raw: [],
content: [],
content?: false,
tool_calls: %{},
usage: nil,
done?: false,
on_event: nil,
emit_interval_ms: 100,
last_emit_at: nil
@type t :: %__MODULE__{}
@spec new((map() -> any()) | nil, keyword()) :: t()
def new(on_event \\ nil, opts \\ []) when is_list(opts) do
%__MODULE__{
on_event: on_event,
emit_interval_ms: Keyword.get(opts, :emit_interval_ms, 100)
}
end
@spec feed(t(), binary()) :: t()
def feed(%__MODULE__{done?: true} = sse, _chunk), do: sse
def feed(%__MODULE__{} = sse, chunk) when is_binary(chunk) do
sse = %{sse | raw: [chunk | sse.raw]}
parts = String.split(sse.buffer <> chunk, ~r/\r?\n\r?\n/)
{complete_events, [rest]} = Enum.split(parts, -1)
Enum.reduce(complete_events, %{sse | buffer: rest}, &process_event(&2, &1))
end
@doc """
The unparsed transport bytes, for callers that discover after the fact
that the response was not an event stream (e.g. a provider that ignored
the `stream` flag and answered with plain JSON).
"""
@spec raw_body(t()) :: binary()
def raw_body(%__MODULE__{} = sse) do
sse.raw |> Enum.reverse() |> IO.iodata_to_binary()
end
@spec finish(t()) :: %{content: binary() | nil, tool_calls: [map()], usage: map() | nil}
def finish(%__MODULE__{} = sse) do
# A final event may arrive without its trailing blank line.
sse =
case String.trim(sse.buffer) do
"" -> sse
remnant -> process_event(%{sse | buffer: ""}, remnant)
end
%{
content: assembled_content(sse),
tool_calls: assembled_tool_calls(sse),
usage: sse.usage
}
end
defp process_event(%{done?: true} = sse, _event), do: sse
defp process_event(sse, event) do
data =
event
|> String.split(~r/\r?\n/)
|> Enum.flat_map(&data_line/1)
|> Enum.join("\n")
cond do
data == "" ->
sse
String.trim(data) == "[DONE]" ->
%{sse | done?: true}
true ->
case Jason.decode(data) do
{:ok, payload} when is_map(payload) -> apply_payload(sse, payload)
_other -> sse
end
end
end
defp data_line("data: " <> rest), do: [rest]
defp data_line("data:" <> rest), do: [rest]
defp data_line(_line), do: []
defp apply_payload(sse, payload) do
delta = get_in(payload, ["choices", Access.at(0), "delta"]) || %{}
sse
|> apply_content(delta["content"])
|> apply_tool_calls(delta["tool_calls"])
|> apply_usage(payload["usage"])
end
defp apply_content(sse, content) when is_binary(content) and content != "" do
%{sse | content: [content | sse.content], content?: true}
|> maybe_emit()
end
defp apply_content(sse, _content), do: sse
defp apply_tool_calls(sse, [_ | _] = fragments) do
Enum.reduce(fragments, sse, fn fragment, acc ->
index = fragment["index"] || 0
existing = Map.get(acc.tool_calls, index, %{id: nil, name: nil, arguments: []})
function_part = fragment["function"] || %{}
merged = %{
id: existing.id || fragment["id"],
name: existing.name || function_part["name"],
arguments: [existing.arguments, function_part["arguments"] || ""]
}
%{acc | tool_calls: Map.put(acc.tool_calls, index, merged)}
end)
end
defp apply_tool_calls(sse, _fragments), do: sse
defp apply_usage(sse, usage) when is_map(usage) and map_size(usage) > 0,
do: %{sse | usage: usage}
defp apply_usage(sse, _usage), do: sse
defp maybe_emit(%{on_event: nil} = sse), do: sse
defp maybe_emit(sse) do
now = System.monotonic_time(:millisecond)
if is_nil(sse.last_emit_at) or now - sse.last_emit_at >= sse.emit_interval_ms do
sse.on_event.(%{content: assembled_content(sse) || ""})
%{sse | last_emit_at: now}
else
sse
end
end
defp assembled_content(%{content?: false}), do: nil
defp assembled_content(sse) do
sse.content |> Enum.reverse() |> IO.iodata_to_binary()
end
defp assembled_tool_calls(sse) do
sse.tool_calls
|> Enum.sort_by(fn {index, _tool_call} -> index end)
|> Enum.map(fn {_index, tool_call} ->
%{
"id" => tool_call.id,
"function" => %{
"name" => tool_call.name,
"arguments" => IO.iodata_to_binary(tool_call.arguments)
}
}
end)
end
end

View File

@@ -0,0 +1,164 @@
defmodule BDS.AI.ChatStreamingTest do
use ExUnit.Case, async: false
defmodule StreamingChatPlug do
import Plug.Conn
def init(opts), do: opts
def call(conn, _opts) do
{:ok, body, conn} = read_body(conn)
payload = Jason.decode!(body)
if payload["stream"] == true do
stream_chat(conn)
else
# Chat-title generation and other one-shot requests stay non-streaming.
conn
|> put_resp_content_type("application/json")
|> send_resp(
200,
Jason.encode!(%{
"choices" => [%{"message" => %{"content" => "Story Time"}}],
"usage" => %{"prompt_tokens" => 1, "completion_tokens" => 1}
})
)
end
end
defp stream_chat(conn) do
conn =
conn
|> put_resp_content_type("text/event-stream")
|> send_chunked(200)
case Application.get_env(:bds, :chat_stream_scenario, :short) do
:short -> stream_short(conn)
:endless -> stream_endless(conn)
end
end
defp stream_short(conn) do
events =
[
delta_event(%{"content" => "Once"}),
delta_event(%{"content" => " upon"}),
delta_event(%{"content" => " a time"}),
"data: " <>
Jason.encode!(%{
"choices" => [],
"usage" => %{"prompt_tokens" => 9, "completion_tokens" => 4}
}) <> "\n\n",
"data: [DONE]\n\n"
]
Enum.reduce_while(events, conn, fn event, conn ->
case chunk(conn, event) do
{:ok, conn} -> {:cont, conn}
{:error, _reason} -> {:halt, conn}
end
end)
end
defp stream_endless(conn) do
case chunk(conn, delta_event(%{"content" => "tick "})) do
{:ok, conn} ->
Process.sleep(50)
stream_endless(conn)
{:error, _reason} ->
send(test_pid(), :sse_client_disconnected)
conn
end
end
defp delta_event(delta) do
"data: " <> Jason.encode!(%{"choices" => [%{"delta" => delta}]}) <> "\n\n"
end
defp test_pid, do: Application.get_env(:bds, :chat_stream_test_pid)
end
setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo)
Application.put_env(:bds, :chat_stream_test_pid, self())
Application.put_env(:bds, :chat_stream_scenario, :short)
original_chat = Application.fetch_env(:bds, :chat)
Application.put_env(
:bds,
:chat,
Keyword.merge(Application.get_env(:bds, :chat, []), stream_emit_interval_ms: 0)
)
on_exit(fn ->
Application.delete_env(:bds, :chat_stream_scenario)
case original_chat do
{:ok, value} -> Application.put_env(:bds, :chat, value)
:error -> Application.delete_env(:bds, :chat)
end
end)
server = start_supervised!({Bandit, plug: StreamingChatPlug, port: 0, startup_log: false})
{:ok, {_address, port}} = ThousandIsland.listener_info(server)
assert {:ok, _endpoint} =
BDS.AI.put_endpoint(:online, %{
url: "http://127.0.0.1:#{port}/v1",
api_key: "sk-stream",
model: "stream-model"
})
assert :ok = BDS.AI.set_airplane_mode(false)
assert {:ok, conversation} = BDS.AI.start_chat(%{model: "stream-model"})
{:ok, conversation: conversation}
end
test "incremental content events arrive before the final reply and persistence matches", %{
conversation: conversation
} do
conversation_id = conversation.id
assert {:ok, reply} =
BDS.AI.send_chat_message(conversation_id, "tell me a story",
event_target: self()
)
assert reply.assistant_message.content == "Once upon a time"
assert_received {:chat_streaming_content, ^conversation_id, "Once"}
assert_received {:chat_streaming_content, ^conversation_id, "Once upon"}
assert_received {:chat_streaming_content, ^conversation_id, "Once upon a time"}
messages = BDS.AI.list_chat_messages(conversation_id)
assistant_message = List.last(messages)
assert assistant_message.role == :assistant
assert assistant_message.content == "Once upon a time"
assert assistant_message.token_usage_input == 9
assert assistant_message.token_usage_output == 4
end
test "cancel_chat mid-stream aborts the HTTP request", %{conversation: conversation} do
Application.put_env(:bds, :chat_stream_scenario, :endless)
conversation_id = conversation.id
test_pid = self()
task =
Task.async(fn ->
BDS.AI.send_chat_message(conversation_id, "stream forever", event_target: test_pid)
end)
# Wait until tokens are actually flowing before cancelling.
assert_receive {:chat_streaming_content, ^conversation_id, _content}, 2_000
assert :ok = BDS.AI.cancel_chat(conversation_id)
assert {:error, :cancelled} = Task.await(task)
# The server notices the closed connection — the request was truly aborted.
assert_receive :sse_client_disconnected, 2_000
end
end

View File

@@ -0,0 +1,253 @@
defmodule BDS.AI.OpenAICompatibleRuntimeStreamingTest do
use ExUnit.Case, async: false
alias BDS.AI.OpenAICompatibleRuntime
defmodule SSEPlug do
import Plug.Conn
def init(opts), do: opts
def call(conn, _opts) do
{:ok, body, conn} = read_body(conn)
payload = Jason.decode!(body)
send(test_pid(), {:endpoint_request, payload})
respond(conn, payload["model"], payload)
end
defp respond(conn, "stream-content", %{"stream" => true}) do
stream(conn, [
delta_event(%{"role" => "assistant", "content" => ""}),
delta_event(%{"content" => "Once"}),
delta_event(%{"content" => " upon"}),
delta_event(%{"content" => " a time"}),
~s(data: ) <>
Jason.encode!(%{
"choices" => [],
"usage" => %{"prompt_tokens" => 7, "completion_tokens" => 3}
}) <> "\n\n",
"data: [DONE]\n\n"
])
end
defp respond(conn, "stream-tools", %{"stream" => true}) do
stream(conn, [
delta_event(%{
"tool_calls" => [
%{
"index" => 0,
"id" => "call-1",
"function" => %{"name" => "search_posts", "arguments" => ""}
}
]
}),
delta_event(%{
"tool_calls" => [%{"index" => 0, "function" => %{"arguments" => "{\"query\":"}}]
}),
delta_event(%{
"tool_calls" => [%{"index" => 0, "function" => %{"arguments" => "\"sun\"}"}}]
}),
"data: [DONE]\n\n"
])
end
defp respond(conn, "stream-error", %{"stream" => true}) do
send_resp(conn, 503, ~s({"error":"overloaded"}))
end
# Simulates a provider that ignores the "stream" flag and answers with a
# plain JSON completion.
defp respond(conn, "ignores-stream", %{"stream" => true}) do
conn
|> put_resp_content_type("application/json")
|> send_resp(
200,
Jason.encode!(%{
"choices" => [%{"message" => %{"content" => "plain json despite stream"}}],
"usage" => %{"prompt_tokens" => 5, "completion_tokens" => 2}
})
)
end
defp respond(conn, _model, _payload) do
conn
|> put_resp_content_type("application/json")
|> send_resp(
200,
Jason.encode!(%{
"choices" => [%{"message" => %{"content" => "non-streaming reply"}}],
"usage" => %{"prompt_tokens" => 1, "completion_tokens" => 1}
})
)
end
defp delta_event(delta) do
"data: " <> Jason.encode!(%{"choices" => [%{"delta" => delta}]}) <> "\n\n"
end
defp stream(conn, events) do
conn =
conn
|> put_resp_content_type("text/event-stream")
|> send_chunked(200)
Enum.reduce_while(events, conn, fn event, conn ->
case chunk(conn, event) do
{:ok, conn} -> {:cont, conn}
{:error, _reason} -> {:halt, conn}
end
end)
end
defp test_pid, do: Application.get_env(:bds, :sse_plug_test_pid)
end
setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo)
Application.put_env(:bds, :sse_plug_test_pid, self())
original_chat = Application.fetch_env(:bds, :chat)
Application.put_env(
:bds,
:chat,
Keyword.merge(Application.get_env(:bds, :chat, []), stream_emit_interval_ms: 0)
)
on_exit(fn ->
case original_chat do
{:ok, value} -> Application.put_env(:bds, :chat, value)
:error -> Application.delete_env(:bds, :chat)
end
end)
server = start_supervised!({Bandit, plug: SSEPlug, port: 0, startup_log: false})
{:ok, {_address, port}} = ThousandIsland.listener_info(server)
{:ok, url: "http://127.0.0.1:#{port}/v1"}
end
defp chat_request(model) do
%{
operation: :chat,
model: model,
max_output_tokens: 64,
messages: [%{"role" => "user", "content" => "hello"}]
}
end
defp stream_collector do
test_pid = self()
fn event -> send(test_pid, {:stream_event, event}) end
end
test "generate streams cumulative content and returns the assembled response", %{url: url} do
assert {:ok, response} =
OpenAICompatibleRuntime.generate(
%{url: url, api_key: "sk-test"},
chat_request("stream-content"),
on_stream: stream_collector()
)
assert response.content == "Once upon a time"
assert response.tool_calls == []
assert response.usage.input_tokens == 7
assert response.usage.output_tokens == 3
assert_received {:endpoint_request, payload}
assert payload["stream"] == true
assert payload["stream_options"] == %{"include_usage" => true}
assert_received {:stream_event, %{content: "Once"}}
assert_received {:stream_event, %{content: "Once upon"}}
assert_received {:stream_event, %{content: "Once upon a time"}}
end
test "generate assembles tool calls streamed as fragments", %{url: url} do
assert {:ok, response} =
OpenAICompatibleRuntime.generate(
%{url: url, api_key: "sk-test"},
chat_request("stream-tools"),
on_stream: stream_collector()
)
assert response.content == nil
assert response.tool_calls == [
%{id: "call-1", name: "search_posts", arguments: %{"query" => "sun"}}
]
end
test "an error status during streaming surfaces as a structured error", %{url: url} do
assert {:error, %{kind: :http_error, status: 503}} =
OpenAICompatibleRuntime.generate(
%{url: url, api_key: "sk-test"},
chat_request("stream-error"),
on_stream: stream_collector()
)
end
test "a provider that ignores the stream flag still produces a full response", %{url: url} do
assert {:ok, response} =
OpenAICompatibleRuntime.generate(
%{url: url, api_key: "sk-test"},
chat_request("ignores-stream"),
on_stream: stream_collector()
)
assert response.content == "plain json despite stream"
assert response.usage.input_tokens == 5
assert response.usage.output_tokens == 2
end
test "streaming is skipped when disabled via config", %{url: url} do
Application.put_env(
:bds,
:chat,
Keyword.merge(Application.get_env(:bds, :chat, []), streaming: false)
)
assert {:ok, %{content: "non-streaming reply"}} =
OpenAICompatibleRuntime.generate(
%{url: url, api_key: "sk-test"},
chat_request("any-model"),
on_stream: stream_collector()
)
assert_received {:endpoint_request, payload}
refute Map.has_key?(payload, "stream")
refute_received {:stream_event, _event}
end
test "streaming requires an on_stream callback", %{url: url} do
assert {:ok, %{content: "non-streaming reply"}} =
OpenAICompatibleRuntime.generate(
%{url: url, api_key: "sk-test"},
chat_request("any-model"),
[]
)
assert_received {:endpoint_request, payload}
refute Map.has_key?(payload, "stream")
end
test "non-chat operations never stream", %{url: url} do
request = %{
operation: :chat_title,
model: "any-model",
max_output_tokens: 32,
messages: [%{"role" => "user", "content" => "Topic: hello"}]
}
assert {:ok, %{content: "non-streaming reply"}} =
OpenAICompatibleRuntime.generate(
%{url: url, api_key: "sk-test"},
request,
on_stream: stream_collector()
)
assert_received {:endpoint_request, payload}
refute Map.has_key?(payload, "stream")
end
end

200
test/bds/ai/sse_test.exs Normal file
View File

@@ -0,0 +1,200 @@
defmodule BDS.AI.SSETest do
use ExUnit.Case, async: true
alias BDS.AI.SSE
defp chunk_event(payload), do: "data: " <> Jason.encode!(payload) <> "\n\n"
defp content_delta(text) do
%{"choices" => [%{"delta" => %{"content" => text}}]}
end
test "assembles content from deltas across separate chunks" do
sse = SSE.new(nil)
sse =
sse
|> SSE.feed(chunk_event(content_delta("Hel")))
|> SSE.feed(chunk_event(content_delta("lo ")))
|> SSE.feed(chunk_event(content_delta("world")))
|> SSE.feed("data: [DONE]\n\n")
assert %{content: "Hello world", tool_calls: [], usage: nil} = SSE.finish(sse)
end
test "handles events split across arbitrary chunk boundaries" do
raw =
chunk_event(content_delta("alpha ")) <>
chunk_event(content_delta("beta")) <> "data: [DONE]\n\n"
# Feed the byte stream in 7-byte slices to exercise buffering.
sse =
raw
|> :binary.bin_to_list()
|> Enum.chunk_every(7)
|> Enum.map(&:binary.list_to_bin/1)
|> Enum.reduce(SSE.new(nil), &SSE.feed(&2, &1))
assert %{content: "alpha beta"} = SSE.finish(sse)
end
test "supports CRLF line endings and data lines without a space" do
payload = Jason.encode!(content_delta("crlf"))
sse = SSE.feed(SSE.new(nil), "data:" <> payload <> "\r\n\r\ndata: [DONE]\r\n\r\n")
assert %{content: "crlf"} = SSE.finish(sse)
end
test "ignores comments, other fields, and undecodable data" do
sse =
SSE.new(nil)
|> SSE.feed(": keep-alive\n\n")
|> SSE.feed("event: message\nid: 7\n" <> "data: " <> Jason.encode!(content_delta("ok")) <> "\n\n")
|> SSE.feed("data: not-json\n\n")
assert %{content: "ok"} = SSE.finish(sse)
end
test "stops processing after [DONE]" do
sse =
SSE.new(nil)
|> SSE.feed(chunk_event(content_delta("kept")))
|> SSE.feed("data: [DONE]\n\n")
|> SSE.feed(chunk_event(content_delta(" dropped")))
assert %{content: "kept"} = SSE.finish(sse)
end
test "finishes a trailing event that lacks the final blank line" do
sse = SSE.feed(SSE.new(nil), "data: " <> Jason.encode!(content_delta("tail")))
assert %{content: "tail"} = SSE.finish(sse)
end
test "content is nil when the stream carried no content" do
sse = SSE.feed(SSE.new(nil), "data: [DONE]\n\n")
assert %{content: nil} = SSE.finish(sse)
end
test "assembles tool calls from fragments in OpenAI wire shape" do
fragments = [
%{
"choices" => [
%{
"delta" => %{
"tool_calls" => [
%{
"index" => 0,
"id" => "call-1",
"function" => %{"name" => "search_posts", "arguments" => ""}
}
]
}
}
]
},
%{
"choices" => [
%{
"delta" => %{
"tool_calls" => [
%{"index" => 0, "function" => %{"arguments" => "{\"query\":"}},
%{
"index" => 1,
"id" => "call-2",
"function" => %{"name" => "count_posts", "arguments" => "{}"}
}
]
}
}
]
},
%{
"choices" => [
%{
"delta" => %{
"tool_calls" => [%{"index" => 0, "function" => %{"arguments" => "\"sun\"}"}}]
}
}
]
}
]
sse = Enum.reduce(fragments, SSE.new(nil), &SSE.feed(&2, chunk_event(&1)))
assert %{tool_calls: tool_calls} = SSE.finish(sse)
assert tool_calls == [
%{
"id" => "call-1",
"function" => %{"name" => "search_posts", "arguments" => ~s({"query":"sun"})}
},
%{"id" => "call-2", "function" => %{"name" => "count_posts", "arguments" => "{}"}}
]
end
test "captures usage from the final chunk" do
sse =
SSE.new(nil)
|> SSE.feed(chunk_event(content_delta("hi")))
|> SSE.feed(
chunk_event(%{"choices" => [], "usage" => %{"prompt_tokens" => 7, "completion_tokens" => 2}})
)
assert %{usage: %{"prompt_tokens" => 7, "completion_tokens" => 2}} = SSE.finish(sse)
end
test "emits cumulative content snapshots to the callback" do
test_pid = self()
sse = SSE.new(fn event -> send(test_pid, {:stream_event, event}) end, emit_interval_ms: 0)
sse
|> SSE.feed(chunk_event(content_delta("one")))
|> SSE.feed(chunk_event(content_delta(" two")))
assert_received {:stream_event, %{content: "one"}}
assert_received {:stream_event, %{content: "one two"}}
end
test "throttles intermediate emissions but always emits the first delta" do
test_pid = self()
sse =
SSE.new(fn event -> send(test_pid, {:stream_event, event}) end,
emit_interval_ms: 60_000
)
sse
|> SSE.feed(chunk_event(content_delta("first")))
|> SSE.feed(chunk_event(content_delta(" second")))
|> SSE.feed(chunk_event(content_delta(" third")))
assert_received {:stream_event, %{content: "first"}}
refute_received {:stream_event, _event}
end
test "tool-call-only streams emit no content events" do
test_pid = self()
sse =
SSE.new(fn event -> send(test_pid, {:stream_event, event}) end, emit_interval_ms: 0)
SSE.feed(
sse,
chunk_event(%{
"choices" => [
%{
"delta" => %{
"tool_calls" => [
%{"index" => 0, "id" => "c", "function" => %{"name" => "n", "arguments" => "{}"}}
]
}
}
]
})
)
refute_received {:stream_event, _event}
end
end