fix: implemented TD-07, chat await path with deadline
This commit is contained in:
15
TECHDEBTS.md
15
TECHDEBTS.md
@@ -312,7 +312,20 @@ airplane mode / local model / toast.
|
|||||||
events arrive before the final `{:ok, reply}`; tool-call rounds still work;
|
events arrive before the final `{:ok, reply}`; tool-call rounds still work;
|
||||||
cancellation mid-stream (TD-07) aborts the HTTP request.
|
cancellation mid-stream (TD-07) aborts the HTTP request.
|
||||||
|
|
||||||
### TD-07: Bound the chat await chain; end-to-end timeout & cancellation
|
### TD-07: Bound the chat await chain; end-to-end timeout & cancellation ✅ DONE (2026-06-12)
|
||||||
|
|
||||||
|
**Status: implemented.** `BDS.AI.Chat.send_chat_message/3` no longer waits
|
||||||
|
unboundedly on the supervised chat task: `await_chat_task/2` now applies a
|
||||||
|
global deadline derived from the configured per-request HTTP budget and the
|
||||||
|
bounded tool loop (`BDS.AI.HttpClient.request_timeout_ms() *
|
||||||
|
(chat_max_tool_rounds + 1) + config :bds, :chat, :await_timeout_margin_ms`). On
|
||||||
|
deadline expiry it returns `{:error, :chat_timeout}` and shuts the task down via
|
||||||
|
`Task.shutdown/2`, so the caller is released even if the runtime wedges.
|
||||||
|
`config/config.exs` now exposes `:await_timeout_margin_ms` under `:chat`. The
|
||||||
|
acceptance proof is a shutdown-aware blocking runtime test that asserts the
|
||||||
|
timeout result, verifies the task receives shutdown, and confirms the
|
||||||
|
conversation persists only the user message on timeout; existing cancellation
|
||||||
|
and streaming tests remain green.
|
||||||
|
|
||||||
**Context.** `BDS.AI.Chat.send_chat_message/3` blocks the caller (a LiveView
|
**Context.** `BDS.AI.Chat.send_chat_message/3` blocks the caller (a LiveView
|
||||||
process) on a hand-rolled `await_chat_task/1` — a raw `receive` with **no
|
process) on a hand-rolled `await_chat_task/1` — a raw `receive` with **no
|
||||||
|
|||||||
@@ -70,10 +70,13 @@ config :bds, :scripting,
|
|||||||
# streaming: chat completions use SSE when the provider supports it (set to
|
# streaming: chat completions use SSE when the provider supports it (set to
|
||||||
# false for OpenAI-compatible servers that reject the "stream" flag).
|
# false for OpenAI-compatible servers that reject the "stream" flag).
|
||||||
# stream_emit_interval_ms throttles how often streamed content reaches the UI.
|
# stream_emit_interval_ms throttles how often streamed content reaches the UI.
|
||||||
|
# await_timeout_margin_ms is added on top of the per-request HTTP budget across
|
||||||
|
# the bounded tool-call loop, so the caller never waits forever.
|
||||||
config :bds, :chat,
|
config :bds, :chat,
|
||||||
max_tool_rounds: 10,
|
max_tool_rounds: 10,
|
||||||
streaming: true,
|
streaming: true,
|
||||||
stream_emit_interval_ms: 100
|
stream_emit_interval_ms: 100,
|
||||||
|
await_timeout_margin_ms: 5_000
|
||||||
|
|
||||||
config :bds, :embeddings,
|
config :bds, :embeddings,
|
||||||
backend: BDS.Embeddings.Backends.Neural,
|
backend: BDS.Embeddings.Backends.Neural,
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ defmodule BDS.AI.Chat do
|
|||||||
@title_max_output_tokens 256
|
@title_max_output_tokens 256
|
||||||
@chat_title_max_length 30
|
@chat_title_max_length 30
|
||||||
@chat_max_tool_rounds 10
|
@chat_max_tool_rounds 10
|
||||||
|
@chat_await_timeout_margin_ms 5_000
|
||||||
@default_context_window 128_000
|
@default_context_window 128_000
|
||||||
|
|
||||||
@spec start_chat(map()) :: {:ok, map()} | {:error, Ecto.Changeset.t()}
|
@spec start_chat(map()) :: {:ok, map()} | {:error, Ecto.Changeset.t()}
|
||||||
@@ -208,7 +209,7 @@ defmodule BDS.AI.Chat do
|
|||||||
send(task.pid, :sandbox_ready)
|
send(task.pid, :sandbox_ready)
|
||||||
|
|
||||||
try do
|
try do
|
||||||
await_chat_task(task)
|
await_chat_task(task, chat_await_timeout_ms())
|
||||||
after
|
after
|
||||||
InFlight.unregister(conversation.id)
|
InFlight.unregister(conversation.id)
|
||||||
end
|
end
|
||||||
@@ -756,9 +757,20 @@ defmodule BDS.AI.Chat do
|
|||||||
# BoundedToolLoop: the tool-calling round count is capped by
|
# BoundedToolLoop: the tool-calling round count is capped by
|
||||||
# config.chat_max_tool_rounds (falling back to the built-in default).
|
# config.chat_max_tool_rounds (falling back to the built-in default).
|
||||||
defp chat_max_tool_rounds do
|
defp chat_max_tool_rounds do
|
||||||
|
chat_config(:max_tool_rounds, @chat_max_tool_rounds)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp chat_await_timeout_ms do
|
||||||
|
per_request_timeout_ms = BDS.AI.HttpClient.request_timeout_ms()
|
||||||
|
|
||||||
|
per_request_timeout_ms * (chat_max_tool_rounds() + 1) +
|
||||||
|
chat_config(:await_timeout_margin_ms, @chat_await_timeout_margin_ms)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp chat_config(key, default) do
|
||||||
:bds
|
:bds
|
||||||
|> Application.get_env(:chat, [])
|
|> Application.get_env(:chat, [])
|
||||||
|> Keyword.get(:max_tool_rounds, @chat_max_tool_rounds)
|
|> Keyword.get(key, default)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp chat_system_prompt(project_id, tools) do
|
defp chat_system_prompt(project_id, tools) do
|
||||||
@@ -853,7 +865,7 @@ defmodule BDS.AI.Chat do
|
|||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
defp await_chat_task(task) do
|
defp await_chat_task(task, timeout_ms) do
|
||||||
ref = task.ref
|
ref = task.ref
|
||||||
|
|
||||||
receive do
|
receive do
|
||||||
@@ -879,6 +891,10 @@ defmodule BDS.AI.Chat do
|
|||||||
_other ->
|
_other ->
|
||||||
{:error, :cancelled}
|
{:error, :cancelled}
|
||||||
end
|
end
|
||||||
|
after
|
||||||
|
timeout_ms ->
|
||||||
|
_ = Task.shutdown(task, 100)
|
||||||
|
{:error, :chat_timeout}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,14 @@ defmodule BDS.AI.HttpClient do
|
|||||||
@default_get_max_retries 2
|
@default_get_max_retries 2
|
||||||
@default_retry_delay_ms 500
|
@default_retry_delay_ms 500
|
||||||
|
|
||||||
|
@spec request_timeout_ms() :: pos_integer()
|
||||||
|
def request_timeout_ms do
|
||||||
|
max(
|
||||||
|
config(:connect_timeout_ms, @default_connect_timeout_ms),
|
||||||
|
config(:receive_timeout_ms, @default_receive_timeout_ms)
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
@spec get(String.t(), %{String.t() => String.t()}) ::
|
@spec get(String.t(), %{String.t() => String.t()}) ::
|
||||||
{:ok, %{status: non_neg_integer(), headers: map(), body: binary()}} | {:error, term()}
|
{:ok, %{status: non_neg_integer(), headers: map(), body: binary()}} | {:error, term()}
|
||||||
def get(url, headers) when is_binary(url) and is_map(headers) do
|
def get(url, headers) when is_binary(url) and is_map(headers) do
|
||||||
|
|||||||
@@ -279,6 +279,24 @@ defmodule BDS.AITest do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defmodule ShutdownAwareBlockingRuntime do
|
||||||
|
def generate(endpoint, request, opts) do
|
||||||
|
Process.flag(:trap_exit, true)
|
||||||
|
|
||||||
|
test_pid = Keyword.fetch!(opts, :test_pid)
|
||||||
|
send(test_pid, {:blocking_runtime_started, endpoint, request, self()})
|
||||||
|
|
||||||
|
receive do
|
||||||
|
{:EXIT, _from, :shutdown} ->
|
||||||
|
send(test_pid, :blocking_runtime_shutdown)
|
||||||
|
exit(:shutdown)
|
||||||
|
after
|
||||||
|
5_000 ->
|
||||||
|
{:ok, %{content: "too late", usage: %{input_tokens: 1, output_tokens: 1}}}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# Always returns another tool call and never a final answer, so a chat would
|
# Always returns another tool call and never a final answer, so a chat would
|
||||||
# loop forever if the round count were not bounded.
|
# loop forever if the round count were not bounded.
|
||||||
defmodule LoopingToolRuntime do
|
defmodule LoopingToolRuntime do
|
||||||
@@ -1772,6 +1790,64 @@ defmodule BDS.AITest do
|
|||||||
assert Enum.map(messages, & &1.role) == [:user]
|
assert Enum.map(messages, & &1.role) == [:user]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@tag :chat_timeout
|
||||||
|
test "send_chat_message times out a stalled chat round and keeps persisted state consistent" do
|
||||||
|
original_chat_config = Application.get_env(:bds, :chat, [])
|
||||||
|
original_http_config = Application.get_env(:bds, BDS.AI.HttpClient, [])
|
||||||
|
|
||||||
|
Application.put_env(
|
||||||
|
:bds,
|
||||||
|
:chat,
|
||||||
|
original_chat_config
|
||||||
|
|> Keyword.put(:max_tool_rounds, 1)
|
||||||
|
|> Keyword.put(:await_timeout_margin_ms, 25)
|
||||||
|
)
|
||||||
|
|
||||||
|
Application.put_env(
|
||||||
|
:bds,
|
||||||
|
BDS.AI.HttpClient,
|
||||||
|
original_http_config
|
||||||
|
|> Keyword.put(:connect_timeout_ms, 50)
|
||||||
|
|> Keyword.put(:receive_timeout_ms, 50)
|
||||||
|
)
|
||||||
|
|
||||||
|
on_exit(fn ->
|
||||||
|
Application.put_env(:bds, :chat, original_chat_config)
|
||||||
|
Application.put_env(:bds, BDS.AI.HttpClient, original_http_config)
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:ok, _endpoint} =
|
||||||
|
BDS.AI.put_endpoint(
|
||||||
|
:online,
|
||||||
|
%{
|
||||||
|
url: "https://api.example.test/v1",
|
||||||
|
api_key: "online-secret",
|
||||||
|
model: "gpt-4o-mini"
|
||||||
|
},
|
||||||
|
secret_backend: FakeSecretBackend
|
||||||
|
)
|
||||||
|
|
||||||
|
assert {:ok, conversation} = BDS.AI.start_chat(%{model: "gpt-4o-mini"})
|
||||||
|
|
||||||
|
started_at = System.monotonic_time(:millisecond)
|
||||||
|
|
||||||
|
assert {:error, :chat_timeout} =
|
||||||
|
BDS.AI.send_chat_message(conversation.id, "Please wait forever",
|
||||||
|
runtime: ShutdownAwareBlockingRuntime,
|
||||||
|
test_pid: self(),
|
||||||
|
secret_backend: FakeSecretBackend
|
||||||
|
)
|
||||||
|
|
||||||
|
elapsed_ms = System.monotonic_time(:millisecond) - started_at
|
||||||
|
|
||||||
|
assert elapsed_ms < 1_000
|
||||||
|
assert_receive {:blocking_runtime_started, _endpoint, %{operation: :chat}, _pid}, 500
|
||||||
|
assert_receive :blocking_runtime_shutdown, 500
|
||||||
|
|
||||||
|
messages = BDS.AI.list_chat_messages(conversation.id)
|
||||||
|
assert Enum.map(messages, & &1.role) == [:user]
|
||||||
|
end
|
||||||
|
|
||||||
test "get_surface_state and put_surface_state persist and restore surface UI state" do
|
test "get_surface_state and put_surface_state persist and restore surface UI state" do
|
||||||
assert {:ok, conversation} = BDS.AI.start_chat(%{title: "Surface State", model: "gpt-4.1"})
|
assert {:ok, conversation} = BDS.AI.start_chat(%{title: "Surface State", model: "gpt-4.1"})
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
defmodule BDS.WxrParserTest do
|
defmodule BDS.WxrParserTest do
|
||||||
use ExUnit.Case, async: true
|
use ExUnit.Case, async: false
|
||||||
|
|
||||||
alias BDS.WxrParser
|
alias BDS.WxrParser
|
||||||
|
|
||||||
@@ -102,6 +102,8 @@ defmodule BDS.WxrParserTest do
|
|||||||
</rss>
|
</rss>
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
_warmup = WxrParser.parse_xml(sample_wxr_xml())
|
||||||
|
|
||||||
atom_count_before = :erlang.system_info(:atom_count)
|
atom_count_before = :erlang.system_info(:atom_count)
|
||||||
parsed = WxrParser.parse_xml(xml)
|
parsed = WxrParser.parse_xml(xml)
|
||||||
atom_count_after = :erlang.system_info(:atom_count)
|
atom_count_after = :erlang.system_info(:atom_count)
|
||||||
|
|||||||
Reference in New Issue
Block a user