diff --git a/TECHDEBTS.md b/TECHDEBTS.md index bf0f332..b1f15b8 100644 --- a/TECHDEBTS.md +++ b/TECHDEBTS.md @@ -238,10 +238,23 @@ batching speedup. ## Phase 2 — Unbounded blocking & cancellation -### TD-06: Real SSE streaming for chat +### TD-06: Real SSE streaming for chat ✅ DONE (2026-06-11) **Depends on TD-02 (Req).** +**Status: implemented.** Chat requests now send `"stream": true` (+ +`stream_options.include_usage`) and consume the SSE response incrementally +via `HttpClient.post_stream/5` (Req `into:`). `BDS.AI.SSE` assembles content +deltas, tool-call fragments, and usage, emitting **cumulative content +snapshots** throttled to `stream_emit_interval_ms` (default 100ms) — replace +semantics, so the chat editor needed no changes and tool rounds reset +naturally. Streaming applies only to `operation: :chat` with an `:on_stream` +callback, can be disabled via `config :bds, :chat, streaming: false`, and +providers that ignore the stream flag are auto-detected by content-type and +parsed as plain JSON. Cancellation kills the chat task, which aborts the +underlying connection (server-observed in tests). Persistence semantics are +unchanged (one assistant row per round, same usage normalization). + **Context.** `OpenAICompatibleRuntime.generate/3` never sets `"stream": true`; the UI's `{:chat_streaming_content, ...}` event fires exactly once with the complete response, i.e. streaming is fake. For local models this is the diff --git a/config/config.exs b/config/config.exs index 3554e0f..b397b72 100644 --- a/config/config.exs +++ b/config/config.exs @@ -67,7 +67,13 @@ config :bds, :scripting, transform_max_toasts_total: 20, transform_max_toast_length: 300 -config :bds, :chat, max_tool_rounds: 10 +# streaming: chat completions use SSE when the provider supports it (set to +# false for OpenAI-compatible servers that reject the "stream" flag). +# stream_emit_interval_ms throttles how often streamed content reaches the UI. +config :bds, :chat, + max_tool_rounds: 10, + streaming: true, + stream_emit_interval_ms: 100 config :bds, :embeddings, backend: BDS.Embeddings.Backends.Neural, diff --git a/lib/bds/ai/chat.ex b/lib/bds/ai/chat.ex index 33c2b32..63b3806 100644 --- a/lib/bds/ai/chat.ex +++ b/lib/bds/ai/chat.ex @@ -565,9 +565,10 @@ defmodule BDS.AI.Chat do rounds_left ) do request = build_chat_request(conversation, messages, model, project_id, tools) + generate_opts = put_stream_callback(opts, conversation.id) with {:ok, response} <- - runtime.generate(Runtime.endpoint_with_model(endpoint, model), request, opts), + runtime.generate(Runtime.endpoint_with_model(endpoint, model), request, generate_opts), {:ok, assistant_message} <- persist_assistant_response(conversation.id, response), :ok <- touch_conversation(conversation.id) do if is_binary(Map.get(response, :content)) and String.trim(Map.get(response, :content)) != "" do @@ -921,6 +922,26 @@ defmodule BDS.AI.Chat do end end + # When someone is listening for chat events, ask the runtime to stream: + # it emits cumulative content snapshots, which the editor renders with + # replace semantics. The full-content notify after each round stays the + # authoritative final state (and the only event for non-streaming runtimes). + defp put_stream_callback(opts, conversation_id) do + case Keyword.get(opts, :event_target) do + nil -> + opts + + _target -> + Keyword.put(opts, :on_stream, fn %{content: content} -> + if is_binary(content) and String.trim(content) != "" do + notify_chat_event(opts, {:chat_streaming_content, conversation_id, content}) + end + + :ok + end) + end + end + defp notify_chat_event(opts, event) do case Keyword.get(opts, :event_target) do pid when is_pid(pid) -> send(pid, event) diff --git a/lib/bds/ai/http_client.ex b/lib/bds/ai/http_client.ex index 7f46dc6..bed6b5b 100644 --- a/lib/bds/ai/http_client.ex +++ b/lib/bds/ai/http_client.ex @@ -59,6 +59,62 @@ defmodule BDS.AI.HttpClient do |> normalize_result() end + @doc """ + Streaming POST: body chunks of a 200 response are folded into `acc` via + `reducer.(chunk, acc)` as they arrive; non-200 bodies are collected whole + for error reporting. Returns the final accumulator alongside the response. + + Never retried (same reasoning as `post/3`), and `accept-encoding` is + disabled so event-stream chunks arrive uncompressed. The request runs in + the calling process — killing that process aborts the underlying + connection, which is what makes mid-stream chat cancellation work. + """ + @spec post_stream(String.t(), %{String.t() => String.t()}, binary(), acc, (binary(), acc -> + acc)) :: + {:ok, %{status: non_neg_integer(), headers: map(), body: binary()}, acc} + | {:error, term()} + when acc: term() + def post_stream(url, headers, body, acc, reducer) + when is_binary(url) and is_map(headers) and is_binary(body) and is_function(reducer, 2) do + into = fn {:data, data}, {req, resp} -> + resp = + if resp.status == 200 do + next_acc = reducer.(data, Req.Response.get_private(resp, :bds_stream_acc, acc)) + Req.Response.put_private(resp, :bds_stream_acc, next_acc) + else + %{resp | body: collected_body(resp.body) <> data} + end + + {:cont, {req, resp}} + end + + [ + method: :post, + url: url, + headers: headers, + body: body, + retry: false, + compressed: false, + into: into + ] + |> Keyword.merge(base_options()) + |> Req.request() + |> case do + {:ok, %Req.Response{} = resp} -> + {:ok, %{status: resp.status, headers: normalize_headers(resp.headers), body: collected_body(resp.body)}, + Req.Response.get_private(resp, :bds_stream_acc, acc)} + + {:error, %Req.TransportError{reason: reason}} -> + {:error, reason} + + {:error, reason} -> + {:error, reason} + end + end + + defp collected_body(body) when is_binary(body), do: body + defp collected_body(_body), do: "" + defp base_options do [ connect_options: [timeout: config(:connect_timeout_ms, @default_connect_timeout_ms)], diff --git a/lib/bds/ai/openai_compatible_runtime.ex b/lib/bds/ai/openai_compatible_runtime.ex index b8308f0..6f8150a 100644 --- a/lib/bds/ai/openai_compatible_runtime.ex +++ b/lib/bds/ai/openai_compatible_runtime.ex @@ -4,6 +4,7 @@ defmodule BDS.AI.OpenAICompatibleRuntime do require Logger alias BDS.AI.HttpClient + alias BDS.AI.SSE def list_models(endpoint, opts \\ []) when is_map(endpoint) and is_list(opts) do http_client = Keyword.get(opts, :http_client, HttpClient) @@ -22,7 +23,7 @@ defmodule BDS.AI.OpenAICompatibleRuntime do end end - def generate(endpoint, request, _opts) when is_map(endpoint) and is_map(request) do + def generate(endpoint, request, opts) when is_map(endpoint) and is_map(request) do url = completions_url(endpoint.url) headers = @@ -41,6 +42,14 @@ defmodule BDS.AI.OpenAICompatibleRuntime do |> maybe_disable_thinking(request.model) |> maybe_put_tools(Map.get(request, :tools, [])) + if stream?(request, opts) do + generate_streaming(url, headers, payload, request, Keyword.fetch!(opts, :on_stream)) + else + generate_blocking(url, headers, payload, request) + end + end + + defp generate_blocking(url, headers, payload, request) do payload_json = Jason.encode!(payload) Logger.debug( @@ -81,6 +90,81 @@ defmodule BDS.AI.OpenAICompatibleRuntime do end end + # Streaming variant: same request payload plus stream flags; SSE chunks are + # folded into a BDS.AI.SSE assembler that emits cumulative content + # snapshots to `on_stream` as they arrive. The assembled message goes + # through the same normalization as the blocking path. + defp generate_streaming(url, headers, payload, request, on_stream) do + payload_json = + payload + |> Map.put("stream", true) + |> Map.put("stream_options", %{"include_usage" => true}) + |> Jason.encode!() + + Logger.debug( + "AI OpenAI-compatible streaming request operation=#{inspect(Map.get(request, :operation))} model=#{inspect(request.model)} url=#{url} payload_size=#{byte_size(payload_json)}" + ) + + sse = SSE.new(on_stream, emit_interval_ms: stream_emit_interval_ms()) + + case HttpClient.post_stream(url, headers, payload_json, sse, fn chunk, acc -> + SSE.feed(acc, chunk) + end) do + {:ok, %{status: 200, headers: response_headers}, sse} -> + if event_stream?(response_headers) do + assembled = SSE.finish(sse) + + {:ok, + %{ + content: assembled.content, + json: decode_json_content(assembled.content), + tool_calls: normalize_tool_calls(assembled.tool_calls), + usage: normalize_usage(assembled.usage || %{}) + }} + else + # The provider ignored the stream flag and sent a plain completion. + normalize_response(SSE.raw_body(sse)) + end + + {:ok, %{status: status, body: body}, _sse} -> + Logger.error( + "AI OpenAI-compatible streaming HTTP error status=#{status} body=#{String.slice(body, 0, 2000)}" + ) + + {:error, %{kind: :http_error, status: status, body: body}} + + {:error, reason} -> + Logger.error("AI OpenAI-compatible streaming request failed: #{inspect(reason)}") + {:error, %{kind: :http_error, reason: reason}} + end + end + + # Streaming is opt-in per request (the caller passes :on_stream), limited + # to interactive chat, and can be disabled globally for providers that do + # not support SSE (config :bds, :chat, streaming: false). + defp stream?(request, opts) do + Map.get(request, :operation) == :chat and + is_function(Keyword.get(opts, :on_stream), 1) and + chat_config(:streaming, true) + end + + defp stream_emit_interval_ms, do: chat_config(:stream_emit_interval_ms, 100) + + defp event_stream?(headers) do + case headers["content-type"] do + content_type when is_binary(content_type) -> + String.contains?(content_type, "text/event-stream") + + _missing -> + # No content type: trust the request we made and parse as SSE. + true + end + end + + defp chat_config(key, default) do + :bds |> Application.get_env(:chat, []) |> Keyword.get(key, default) + end + defp normalize_response(body) do with {:ok, payload} <- decode_json_body(body) do message = get_in(payload, ["choices", Access.at(0), "message"]) || %{} @@ -88,19 +172,22 @@ defmodule BDS.AI.OpenAICompatibleRuntime do tool_calls = normalize_tool_calls(message["tool_calls"] || []) usage = normalize_usage(payload["usage"] || %{}) - json = - case content do - nil -> - nil + {:ok, + %{ + content: content, + json: decode_json_content(content), + tool_calls: tool_calls, + usage: usage + }} + end + end - value when is_binary(value) -> - case Jason.decode(value) do - {:ok, decoded} when is_map(decoded) -> decoded - _other -> nil - end - end + defp decode_json_content(nil), do: nil - {:ok, %{content: content, json: json, tool_calls: tool_calls, usage: usage}} + defp decode_json_content(content) when is_binary(content) do + case Jason.decode(content) do + {:ok, decoded} when is_map(decoded) -> decoded + _other -> nil end end diff --git a/lib/bds/ai/sse.ex b/lib/bds/ai/sse.ex new file mode 100644 index 0000000..f7c6c7d --- /dev/null +++ b/lib/bds/ai/sse.ex @@ -0,0 +1,176 @@ +defmodule BDS.AI.SSE do + @moduledoc """ + Incremental assembler for OpenAI-compatible `text/event-stream` chat + completions. + + Fed raw transport chunks via `feed/2`, it buffers partial events, decodes + `data:` payloads, and accumulates content deltas, tool-call fragments, and + usage. Content is reported to the optional `on_event` callback as + **cumulative snapshots** (`%{content: binary}`) — replace semantics, which + matches how the chat editor renders streaming state and resets naturally + between tool rounds. Emissions are throttled to `:emit_interval_ms` + (the first delta always emits immediately for perceived latency). + + `finish/1` returns the assembled message in OpenAI wire shape so the + runtime can reuse its non-streaming normalization: + `%{content: binary | nil, tool_calls: [%{"id" => _, "function" => %{"name" => _, "arguments" => json_string}}], usage: map | nil}`. + """ + + defstruct buffer: "", + raw: [], + content: [], + content?: false, + tool_calls: %{}, + usage: nil, + done?: false, + on_event: nil, + emit_interval_ms: 100, + last_emit_at: nil + + @type t :: %__MODULE__{} + + @spec new((map() -> any()) | nil, keyword()) :: t() + def new(on_event \\ nil, opts \\ []) when is_list(opts) do + %__MODULE__{ + on_event: on_event, + emit_interval_ms: Keyword.get(opts, :emit_interval_ms, 100) + } + end + + @spec feed(t(), binary()) :: t() + def feed(%__MODULE__{done?: true} = sse, _chunk), do: sse + + def feed(%__MODULE__{} = sse, chunk) when is_binary(chunk) do + sse = %{sse | raw: [chunk | sse.raw]} + parts = String.split(sse.buffer <> chunk, ~r/\r?\n\r?\n/) + {complete_events, [rest]} = Enum.split(parts, -1) + + Enum.reduce(complete_events, %{sse | buffer: rest}, &process_event(&2, &1)) + end + + @doc """ + The unparsed transport bytes, for callers that discover after the fact + that the response was not an event stream (e.g. a provider that ignored + the `stream` flag and answered with plain JSON). + """ + @spec raw_body(t()) :: binary() + def raw_body(%__MODULE__{} = sse) do + sse.raw |> Enum.reverse() |> IO.iodata_to_binary() + end + + @spec finish(t()) :: %{content: binary() | nil, tool_calls: [map()], usage: map() | nil} + def finish(%__MODULE__{} = sse) do + # A final event may arrive without its trailing blank line. + sse = + case String.trim(sse.buffer) do + "" -> sse + remnant -> process_event(%{sse | buffer: ""}, remnant) + end + + %{ + content: assembled_content(sse), + tool_calls: assembled_tool_calls(sse), + usage: sse.usage + } + end + + defp process_event(%{done?: true} = sse, _event), do: sse + + defp process_event(sse, event) do + data = + event + |> String.split(~r/\r?\n/) + |> Enum.flat_map(&data_line/1) + |> Enum.join("\n") + + cond do + data == "" -> + sse + + String.trim(data) == "[DONE]" -> + %{sse | done?: true} + + true -> + case Jason.decode(data) do + {:ok, payload} when is_map(payload) -> apply_payload(sse, payload) + _other -> sse + end + end + end + + defp data_line("data: " <> rest), do: [rest] + defp data_line("data:" <> rest), do: [rest] + defp data_line(_line), do: [] + + defp apply_payload(sse, payload) do + delta = get_in(payload, ["choices", Access.at(0), "delta"]) || %{} + + sse + |> apply_content(delta["content"]) + |> apply_tool_calls(delta["tool_calls"]) + |> apply_usage(payload["usage"]) + end + + defp apply_content(sse, content) when is_binary(content) and content != "" do + %{sse | content: [content | sse.content], content?: true} + |> maybe_emit() + end + + defp apply_content(sse, _content), do: sse + + defp apply_tool_calls(sse, [_ | _] = fragments) do + Enum.reduce(fragments, sse, fn fragment, acc -> + index = fragment["index"] || 0 + existing = Map.get(acc.tool_calls, index, %{id: nil, name: nil, arguments: []}) + function_part = fragment["function"] || %{} + + merged = %{ + id: existing.id || fragment["id"], + name: existing.name || function_part["name"], + arguments: [existing.arguments, function_part["arguments"] || ""] + } + + %{acc | tool_calls: Map.put(acc.tool_calls, index, merged)} + end) + end + + defp apply_tool_calls(sse, _fragments), do: sse + + defp apply_usage(sse, usage) when is_map(usage) and map_size(usage) > 0, + do: %{sse | usage: usage} + + defp apply_usage(sse, _usage), do: sse + + defp maybe_emit(%{on_event: nil} = sse), do: sse + + defp maybe_emit(sse) do + now = System.monotonic_time(:millisecond) + + if is_nil(sse.last_emit_at) or now - sse.last_emit_at >= sse.emit_interval_ms do + sse.on_event.(%{content: assembled_content(sse) || ""}) + %{sse | last_emit_at: now} + else + sse + end + end + + defp assembled_content(%{content?: false}), do: nil + + defp assembled_content(sse) do + sse.content |> Enum.reverse() |> IO.iodata_to_binary() + end + + defp assembled_tool_calls(sse) do + sse.tool_calls + |> Enum.sort_by(fn {index, _tool_call} -> index end) + |> Enum.map(fn {_index, tool_call} -> + %{ + "id" => tool_call.id, + "function" => %{ + "name" => tool_call.name, + "arguments" => IO.iodata_to_binary(tool_call.arguments) + } + } + end) + end +end diff --git a/test/bds/ai/chat_streaming_test.exs b/test/bds/ai/chat_streaming_test.exs new file mode 100644 index 0000000..d0fa2e8 --- /dev/null +++ b/test/bds/ai/chat_streaming_test.exs @@ -0,0 +1,164 @@ +defmodule BDS.AI.ChatStreamingTest do + use ExUnit.Case, async: false + + defmodule StreamingChatPlug do + import Plug.Conn + + def init(opts), do: opts + + def call(conn, _opts) do + {:ok, body, conn} = read_body(conn) + payload = Jason.decode!(body) + + if payload["stream"] == true do + stream_chat(conn) + else + # Chat-title generation and other one-shot requests stay non-streaming. + conn + |> put_resp_content_type("application/json") + |> send_resp( + 200, + Jason.encode!(%{ + "choices" => [%{"message" => %{"content" => "Story Time"}}], + "usage" => %{"prompt_tokens" => 1, "completion_tokens" => 1} + }) + ) + end + end + + defp stream_chat(conn) do + conn = + conn + |> put_resp_content_type("text/event-stream") + |> send_chunked(200) + + case Application.get_env(:bds, :chat_stream_scenario, :short) do + :short -> stream_short(conn) + :endless -> stream_endless(conn) + end + end + + defp stream_short(conn) do + events = + [ + delta_event(%{"content" => "Once"}), + delta_event(%{"content" => " upon"}), + delta_event(%{"content" => " a time"}), + "data: " <> + Jason.encode!(%{ + "choices" => [], + "usage" => %{"prompt_tokens" => 9, "completion_tokens" => 4} + }) <> "\n\n", + "data: [DONE]\n\n" + ] + + Enum.reduce_while(events, conn, fn event, conn -> + case chunk(conn, event) do + {:ok, conn} -> {:cont, conn} + {:error, _reason} -> {:halt, conn} + end + end) + end + + defp stream_endless(conn) do + case chunk(conn, delta_event(%{"content" => "tick "})) do + {:ok, conn} -> + Process.sleep(50) + stream_endless(conn) + + {:error, _reason} -> + send(test_pid(), :sse_client_disconnected) + conn + end + end + + defp delta_event(delta) do + "data: " <> Jason.encode!(%{"choices" => [%{"delta" => delta}]}) <> "\n\n" + end + + defp test_pid, do: Application.get_env(:bds, :chat_stream_test_pid) + end + + setup do + :ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo) + + Application.put_env(:bds, :chat_stream_test_pid, self()) + Application.put_env(:bds, :chat_stream_scenario, :short) + + original_chat = Application.fetch_env(:bds, :chat) + + Application.put_env( + :bds, + :chat, + Keyword.merge(Application.get_env(:bds, :chat, []), stream_emit_interval_ms: 0) + ) + + on_exit(fn -> + Application.delete_env(:bds, :chat_stream_scenario) + + case original_chat do + {:ok, value} -> Application.put_env(:bds, :chat, value) + :error -> Application.delete_env(:bds, :chat) + end + end) + + server = start_supervised!({Bandit, plug: StreamingChatPlug, port: 0, startup_log: false}) + {:ok, {_address, port}} = ThousandIsland.listener_info(server) + + assert {:ok, _endpoint} = + BDS.AI.put_endpoint(:online, %{ + url: "http://127.0.0.1:#{port}/v1", + api_key: "sk-stream", + model: "stream-model" + }) + + assert :ok = BDS.AI.set_airplane_mode(false) + assert {:ok, conversation} = BDS.AI.start_chat(%{model: "stream-model"}) + + {:ok, conversation: conversation} + end + + test "incremental content events arrive before the final reply and persistence matches", %{ + conversation: conversation + } do + conversation_id = conversation.id + + assert {:ok, reply} = + BDS.AI.send_chat_message(conversation_id, "tell me a story", + event_target: self() + ) + + assert reply.assistant_message.content == "Once upon a time" + + assert_received {:chat_streaming_content, ^conversation_id, "Once"} + assert_received {:chat_streaming_content, ^conversation_id, "Once upon"} + assert_received {:chat_streaming_content, ^conversation_id, "Once upon a time"} + + messages = BDS.AI.list_chat_messages(conversation_id) + assistant_message = List.last(messages) + assert assistant_message.role == :assistant + assert assistant_message.content == "Once upon a time" + assert assistant_message.token_usage_input == 9 + assert assistant_message.token_usage_output == 4 + end + + test "cancel_chat mid-stream aborts the HTTP request", %{conversation: conversation} do + Application.put_env(:bds, :chat_stream_scenario, :endless) + conversation_id = conversation.id + test_pid = self() + + task = + Task.async(fn -> + BDS.AI.send_chat_message(conversation_id, "stream forever", event_target: test_pid) + end) + + # Wait until tokens are actually flowing before cancelling. + assert_receive {:chat_streaming_content, ^conversation_id, _content}, 2_000 + + assert :ok = BDS.AI.cancel_chat(conversation_id) + assert {:error, :cancelled} = Task.await(task) + + # The server notices the closed connection — the request was truly aborted. + assert_receive :sse_client_disconnected, 2_000 + end +end diff --git a/test/bds/ai/openai_compatible_runtime_streaming_test.exs b/test/bds/ai/openai_compatible_runtime_streaming_test.exs new file mode 100644 index 0000000..9f183dc --- /dev/null +++ b/test/bds/ai/openai_compatible_runtime_streaming_test.exs @@ -0,0 +1,253 @@ +defmodule BDS.AI.OpenAICompatibleRuntimeStreamingTest do + use ExUnit.Case, async: false + + alias BDS.AI.OpenAICompatibleRuntime + + defmodule SSEPlug do + import Plug.Conn + + def init(opts), do: opts + + def call(conn, _opts) do + {:ok, body, conn} = read_body(conn) + payload = Jason.decode!(body) + send(test_pid(), {:endpoint_request, payload}) + + respond(conn, payload["model"], payload) + end + + defp respond(conn, "stream-content", %{"stream" => true}) do + stream(conn, [ + delta_event(%{"role" => "assistant", "content" => ""}), + delta_event(%{"content" => "Once"}), + delta_event(%{"content" => " upon"}), + delta_event(%{"content" => " a time"}), + ~s(data: ) <> + Jason.encode!(%{ + "choices" => [], + "usage" => %{"prompt_tokens" => 7, "completion_tokens" => 3} + }) <> "\n\n", + "data: [DONE]\n\n" + ]) + end + + defp respond(conn, "stream-tools", %{"stream" => true}) do + stream(conn, [ + delta_event(%{ + "tool_calls" => [ + %{ + "index" => 0, + "id" => "call-1", + "function" => %{"name" => "search_posts", "arguments" => ""} + } + ] + }), + delta_event(%{ + "tool_calls" => [%{"index" => 0, "function" => %{"arguments" => "{\"query\":"}}] + }), + delta_event(%{ + "tool_calls" => [%{"index" => 0, "function" => %{"arguments" => "\"sun\"}"}}] + }), + "data: [DONE]\n\n" + ]) + end + + defp respond(conn, "stream-error", %{"stream" => true}) do + send_resp(conn, 503, ~s({"error":"overloaded"})) + end + + # Simulates a provider that ignores the "stream" flag and answers with a + # plain JSON completion. + defp respond(conn, "ignores-stream", %{"stream" => true}) do + conn + |> put_resp_content_type("application/json") + |> send_resp( + 200, + Jason.encode!(%{ + "choices" => [%{"message" => %{"content" => "plain json despite stream"}}], + "usage" => %{"prompt_tokens" => 5, "completion_tokens" => 2} + }) + ) + end + + defp respond(conn, _model, _payload) do + conn + |> put_resp_content_type("application/json") + |> send_resp( + 200, + Jason.encode!(%{ + "choices" => [%{"message" => %{"content" => "non-streaming reply"}}], + "usage" => %{"prompt_tokens" => 1, "completion_tokens" => 1} + }) + ) + end + + defp delta_event(delta) do + "data: " <> Jason.encode!(%{"choices" => [%{"delta" => delta}]}) <> "\n\n" + end + + defp stream(conn, events) do + conn = + conn + |> put_resp_content_type("text/event-stream") + |> send_chunked(200) + + Enum.reduce_while(events, conn, fn event, conn -> + case chunk(conn, event) do + {:ok, conn} -> {:cont, conn} + {:error, _reason} -> {:halt, conn} + end + end) + end + + defp test_pid, do: Application.get_env(:bds, :sse_plug_test_pid) + end + + setup do + :ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo) + Application.put_env(:bds, :sse_plug_test_pid, self()) + + original_chat = Application.fetch_env(:bds, :chat) + + Application.put_env( + :bds, + :chat, + Keyword.merge(Application.get_env(:bds, :chat, []), stream_emit_interval_ms: 0) + ) + + on_exit(fn -> + case original_chat do + {:ok, value} -> Application.put_env(:bds, :chat, value) + :error -> Application.delete_env(:bds, :chat) + end + end) + + server = start_supervised!({Bandit, plug: SSEPlug, port: 0, startup_log: false}) + {:ok, {_address, port}} = ThousandIsland.listener_info(server) + + {:ok, url: "http://127.0.0.1:#{port}/v1"} + end + + defp chat_request(model) do + %{ + operation: :chat, + model: model, + max_output_tokens: 64, + messages: [%{"role" => "user", "content" => "hello"}] + } + end + + defp stream_collector do + test_pid = self() + fn event -> send(test_pid, {:stream_event, event}) end + end + + test "generate streams cumulative content and returns the assembled response", %{url: url} do + assert {:ok, response} = + OpenAICompatibleRuntime.generate( + %{url: url, api_key: "sk-test"}, + chat_request("stream-content"), + on_stream: stream_collector() + ) + + assert response.content == "Once upon a time" + assert response.tool_calls == [] + assert response.usage.input_tokens == 7 + assert response.usage.output_tokens == 3 + + assert_received {:endpoint_request, payload} + assert payload["stream"] == true + assert payload["stream_options"] == %{"include_usage" => true} + + assert_received {:stream_event, %{content: "Once"}} + assert_received {:stream_event, %{content: "Once upon"}} + assert_received {:stream_event, %{content: "Once upon a time"}} + end + + test "generate assembles tool calls streamed as fragments", %{url: url} do + assert {:ok, response} = + OpenAICompatibleRuntime.generate( + %{url: url, api_key: "sk-test"}, + chat_request("stream-tools"), + on_stream: stream_collector() + ) + + assert response.content == nil + + assert response.tool_calls == [ + %{id: "call-1", name: "search_posts", arguments: %{"query" => "sun"}} + ] + end + + test "an error status during streaming surfaces as a structured error", %{url: url} do + assert {:error, %{kind: :http_error, status: 503}} = + OpenAICompatibleRuntime.generate( + %{url: url, api_key: "sk-test"}, + chat_request("stream-error"), + on_stream: stream_collector() + ) + end + + test "a provider that ignores the stream flag still produces a full response", %{url: url} do + assert {:ok, response} = + OpenAICompatibleRuntime.generate( + %{url: url, api_key: "sk-test"}, + chat_request("ignores-stream"), + on_stream: stream_collector() + ) + + assert response.content == "plain json despite stream" + assert response.usage.input_tokens == 5 + assert response.usage.output_tokens == 2 + end + + test "streaming is skipped when disabled via config", %{url: url} do + Application.put_env( + :bds, + :chat, + Keyword.merge(Application.get_env(:bds, :chat, []), streaming: false) + ) + + assert {:ok, %{content: "non-streaming reply"}} = + OpenAICompatibleRuntime.generate( + %{url: url, api_key: "sk-test"}, + chat_request("any-model"), + on_stream: stream_collector() + ) + + assert_received {:endpoint_request, payload} + refute Map.has_key?(payload, "stream") + refute_received {:stream_event, _event} + end + + test "streaming requires an on_stream callback", %{url: url} do + assert {:ok, %{content: "non-streaming reply"}} = + OpenAICompatibleRuntime.generate( + %{url: url, api_key: "sk-test"}, + chat_request("any-model"), + [] + ) + + assert_received {:endpoint_request, payload} + refute Map.has_key?(payload, "stream") + end + + test "non-chat operations never stream", %{url: url} do + request = %{ + operation: :chat_title, + model: "any-model", + max_output_tokens: 32, + messages: [%{"role" => "user", "content" => "Topic: hello"}] + } + + assert {:ok, %{content: "non-streaming reply"}} = + OpenAICompatibleRuntime.generate( + %{url: url, api_key: "sk-test"}, + request, + on_stream: stream_collector() + ) + + assert_received {:endpoint_request, payload} + refute Map.has_key?(payload, "stream") + end +end diff --git a/test/bds/ai/sse_test.exs b/test/bds/ai/sse_test.exs new file mode 100644 index 0000000..39e6d3b --- /dev/null +++ b/test/bds/ai/sse_test.exs @@ -0,0 +1,200 @@ +defmodule BDS.AI.SSETest do + use ExUnit.Case, async: true + + alias BDS.AI.SSE + + defp chunk_event(payload), do: "data: " <> Jason.encode!(payload) <> "\n\n" + + defp content_delta(text) do + %{"choices" => [%{"delta" => %{"content" => text}}]} + end + + test "assembles content from deltas across separate chunks" do + sse = SSE.new(nil) + + sse = + sse + |> SSE.feed(chunk_event(content_delta("Hel"))) + |> SSE.feed(chunk_event(content_delta("lo "))) + |> SSE.feed(chunk_event(content_delta("world"))) + |> SSE.feed("data: [DONE]\n\n") + + assert %{content: "Hello world", tool_calls: [], usage: nil} = SSE.finish(sse) + end + + test "handles events split across arbitrary chunk boundaries" do + raw = + chunk_event(content_delta("alpha ")) <> + chunk_event(content_delta("beta")) <> "data: [DONE]\n\n" + + # Feed the byte stream in 7-byte slices to exercise buffering. + sse = + raw + |> :binary.bin_to_list() + |> Enum.chunk_every(7) + |> Enum.map(&:binary.list_to_bin/1) + |> Enum.reduce(SSE.new(nil), &SSE.feed(&2, &1)) + + assert %{content: "alpha beta"} = SSE.finish(sse) + end + + test "supports CRLF line endings and data lines without a space" do + payload = Jason.encode!(content_delta("crlf")) + sse = SSE.feed(SSE.new(nil), "data:" <> payload <> "\r\n\r\ndata: [DONE]\r\n\r\n") + + assert %{content: "crlf"} = SSE.finish(sse) + end + + test "ignores comments, other fields, and undecodable data" do + sse = + SSE.new(nil) + |> SSE.feed(": keep-alive\n\n") + |> SSE.feed("event: message\nid: 7\n" <> "data: " <> Jason.encode!(content_delta("ok")) <> "\n\n") + |> SSE.feed("data: not-json\n\n") + + assert %{content: "ok"} = SSE.finish(sse) + end + + test "stops processing after [DONE]" do + sse = + SSE.new(nil) + |> SSE.feed(chunk_event(content_delta("kept"))) + |> SSE.feed("data: [DONE]\n\n") + |> SSE.feed(chunk_event(content_delta(" dropped"))) + + assert %{content: "kept"} = SSE.finish(sse) + end + + test "finishes a trailing event that lacks the final blank line" do + sse = SSE.feed(SSE.new(nil), "data: " <> Jason.encode!(content_delta("tail"))) + + assert %{content: "tail"} = SSE.finish(sse) + end + + test "content is nil when the stream carried no content" do + sse = SSE.feed(SSE.new(nil), "data: [DONE]\n\n") + + assert %{content: nil} = SSE.finish(sse) + end + + test "assembles tool calls from fragments in OpenAI wire shape" do + fragments = [ + %{ + "choices" => [ + %{ + "delta" => %{ + "tool_calls" => [ + %{ + "index" => 0, + "id" => "call-1", + "function" => %{"name" => "search_posts", "arguments" => ""} + } + ] + } + } + ] + }, + %{ + "choices" => [ + %{ + "delta" => %{ + "tool_calls" => [ + %{"index" => 0, "function" => %{"arguments" => "{\"query\":"}}, + %{ + "index" => 1, + "id" => "call-2", + "function" => %{"name" => "count_posts", "arguments" => "{}"} + } + ] + } + } + ] + }, + %{ + "choices" => [ + %{ + "delta" => %{ + "tool_calls" => [%{"index" => 0, "function" => %{"arguments" => "\"sun\"}"}}] + } + } + ] + } + ] + + sse = Enum.reduce(fragments, SSE.new(nil), &SSE.feed(&2, chunk_event(&1))) + + assert %{tool_calls: tool_calls} = SSE.finish(sse) + + assert tool_calls == [ + %{ + "id" => "call-1", + "function" => %{"name" => "search_posts", "arguments" => ~s({"query":"sun"})} + }, + %{"id" => "call-2", "function" => %{"name" => "count_posts", "arguments" => "{}"}} + ] + end + + test "captures usage from the final chunk" do + sse = + SSE.new(nil) + |> SSE.feed(chunk_event(content_delta("hi"))) + |> SSE.feed( + chunk_event(%{"choices" => [], "usage" => %{"prompt_tokens" => 7, "completion_tokens" => 2}}) + ) + + assert %{usage: %{"prompt_tokens" => 7, "completion_tokens" => 2}} = SSE.finish(sse) + end + + test "emits cumulative content snapshots to the callback" do + test_pid = self() + sse = SSE.new(fn event -> send(test_pid, {:stream_event, event}) end, emit_interval_ms: 0) + + sse + |> SSE.feed(chunk_event(content_delta("one"))) + |> SSE.feed(chunk_event(content_delta(" two"))) + + assert_received {:stream_event, %{content: "one"}} + assert_received {:stream_event, %{content: "one two"}} + end + + test "throttles intermediate emissions but always emits the first delta" do + test_pid = self() + + sse = + SSE.new(fn event -> send(test_pid, {:stream_event, event}) end, + emit_interval_ms: 60_000 + ) + + sse + |> SSE.feed(chunk_event(content_delta("first"))) + |> SSE.feed(chunk_event(content_delta(" second"))) + |> SSE.feed(chunk_event(content_delta(" third"))) + + assert_received {:stream_event, %{content: "first"}} + refute_received {:stream_event, _event} + end + + test "tool-call-only streams emit no content events" do + test_pid = self() + + sse = + SSE.new(fn event -> send(test_pid, {:stream_event, event}) end, emit_interval_ms: 0) + + SSE.feed( + sse, + chunk_event(%{ + "choices" => [ + %{ + "delta" => %{ + "tool_calls" => [ + %{"index" => 0, "id" => "c", "function" => %{"name" => "n", "arguments" => "{}"}} + ] + } + } + ] + }) + ) + + refute_received {:stream_event, _event} + end +end