chore: cleaned up old task handling
This commit is contained in:
@@ -129,11 +129,7 @@ _None._ All modules previously on the queue have been split; refresh the queue i
|
|||||||
|
|
||||||
## 13. `BDS.Tasks` Memory Growth
|
## 13. `BDS.Tasks` Memory Growth
|
||||||
|
|
||||||
**Status:** open, bounded in practice.
|
**Status:** ✅ done (2026-05-01). `BDS.Tasks` now evicts terminal tasks (`:completed`/`:failed`/`:cancelled`) after a configurable TTL (`:finished_task_ttl_ms`, default 1 h). Eviction is triggered by the GenServer after a task finishes and also pruned on read calls (`get_task`, `status_snapshot`, `list_tasks`, `list_running_tasks`), while pending/running tasks remain retained. The UI snapshot still separately limits recently finished entries with `recent_finished_limit`.
|
||||||
|
|
||||||
**Risk:** the `tasks` map grows for the lifetime of the BEAM unless the UI calls `clear_completed`/`clear_finished`. Long-running desktop sessions could accumulate thousands of finished tasks.
|
|
||||||
|
|
||||||
**Plan:** TTL eviction in the `BDS.Tasks` GenServer (e.g., drop `:completed`/`:failed`/`:cancelled` older than 1 h); already partially mitigated by `recent_finished_limit` in `build_status_snapshot/1`.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -159,6 +155,8 @@ Most tests share the SQLite repo and named GenServers (`BDS.Tasks`, `BDS.Search`
|
|||||||
|
|
||||||
### 2026-05-01
|
### 2026-05-01
|
||||||
|
|
||||||
|
- **`BDS.Tasks` memory growth**: added configurable TTL eviction for terminal tasks in the `BDS.Tasks` GenServer (`:finished_task_ttl_ms`, default 1 h). Finished tasks are pruned by delayed GenServer cleanup and on task read calls; active tasks are preserved. Added regression coverage that completes a task with a tiny TTL and verifies it disappears without `clear_finished/0` while a running task remains. Section 13 is closed.
|
||||||
|
|
||||||
- **Atom/string key duality**: added `BDS.MapUtils.attr/3` and a regression test that scans `lib/**/*.ex` and `lib/**/*.heex` for same-name atom/string `Map.get` fallback reads. Replaced same-name atom/string boundary reads across AI attrs, rendering assigns, pagination/archive contexts, UI command/filter params, metadata category settings, metadata-diff repair payloads, CLI sync payloads, chat tool call normalization, and misc editor duplicate/metadata-diff payload rendering. Remaining mixed-key scan hits are intentionally different-key fallbacks (for example camelCase/snake_case JSON compatibility) or atom-only/string-only boundaries. Section 12 is closed.
|
- **Atom/string key duality**: added `BDS.MapUtils.attr/3` and a regression test that scans `lib/**/*.ex` and `lib/**/*.heex` for same-name atom/string `Map.get` fallback reads. Replaced same-name atom/string boundary reads across AI attrs, rendering assigns, pagination/archive contexts, UI command/filter params, metadata category settings, metadata-diff repair payloads, CLI sync payloads, chat tool call normalization, and misc editor duplicate/metadata-diff payload rendering. Remaining mixed-key scan hits are intentionally different-key fallbacks (for example camelCase/snake_case JSON compatibility) or atom-only/string-only boundaries. Section 12 is closed.
|
||||||
|
|
||||||
- **`Jason.decode!/1` on external HTTP responses**: replaced the 2 scoped OpenAI-compatible runtime response decodes with `Jason.decode/1` and tagged `{:error, %{kind: :invalid_json_response, reason: reason}}` propagation for malformed `/models` and `/chat/completions` bodies. Added regressions covering endpoint model listing through a fake HTTP client and generation through a local Bandit server. Section 9 is closed.
|
- **`Jason.decode!/1` on external HTTP responses**: replaced the 2 scoped OpenAI-compatible runtime response decodes with `Jason.decode/1` and tagged `{:error, %{kind: :invalid_json_response, reason: reason}}` propagation for malformed `/models` and `/chat/completions` bodies. Added regressions covering endpoint model listing through a fake HTTP client and generation through a local Bandit server. Section 9 is closed.
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ defmodule BDS.Tasks do
|
|||||||
@default_max_concurrent 3
|
@default_max_concurrent 3
|
||||||
@default_progress_throttle_ms 250
|
@default_progress_throttle_ms 250
|
||||||
@default_recent_finished_limit 10
|
@default_recent_finished_limit 10
|
||||||
|
@default_finished_task_ttl_ms :timer.hours(1)
|
||||||
|
|
||||||
def start_link(_opts) do
|
def start_link(_opts) do
|
||||||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
||||||
@@ -85,18 +86,22 @@ defmodule BDS.Tasks do
|
|||||||
end
|
end
|
||||||
|
|
||||||
def handle_call({:get_task, task_id}, _from, state) do
|
def handle_call({:get_task, task_id}, _from, state) do
|
||||||
|
state = prune_expired_finished_tasks(state)
|
||||||
{:reply, state.tasks[task_id] && public_task(state.tasks[task_id]), state}
|
{:reply, state.tasks[task_id] && public_task(state.tasks[task_id]), state}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_call(:status_snapshot, _from, state) do
|
def handle_call(:status_snapshot, _from, state) do
|
||||||
|
state = prune_expired_finished_tasks(state)
|
||||||
{:reply, build_status_snapshot(state), state}
|
{:reply, build_status_snapshot(state), state}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_call(:list_tasks, _from, state) do
|
def handle_call(:list_tasks, _from, state) do
|
||||||
|
state = prune_expired_finished_tasks(state)
|
||||||
{:reply, all_tasks(state) |> Enum.map(&public_task/1), state}
|
{:reply, all_tasks(state) |> Enum.map(&public_task/1), state}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_call(:list_running_tasks, _from, state) do
|
def handle_call(:list_running_tasks, _from, state) do
|
||||||
|
state = prune_expired_finished_tasks(state)
|
||||||
{:reply, running_tasks(state) |> Enum.map(&public_task/1), state}
|
{:reply, running_tasks(state) |> Enum.map(&public_task/1), state}
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -129,6 +134,7 @@ defmodule BDS.Tasks do
|
|||||||
|> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()})
|
|> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()})
|
||||||
|> remove_running(task_id, ref)
|
|> remove_running(task_id, ref)
|
||||||
|> start_queued_tasks()
|
|> start_queued_tasks()
|
||||||
|
|> schedule_finished_task_eviction()
|
||||||
|
|
||||||
{:reply, :ok, next_state}
|
{:reply, :ok, next_state}
|
||||||
|
|
||||||
@@ -140,6 +146,7 @@ defmodule BDS.Tasks do
|
|||||||
Enum.reject(queue, fn {queued_id, _work} -> queued_id == task_id end)
|
Enum.reject(queue, fn {queued_id, _work} -> queued_id == task_id end)
|
||||||
end)
|
end)
|
||||||
|> start_queued_tasks()
|
|> start_queued_tasks()
|
||||||
|
|> schedule_finished_task_eviction()
|
||||||
|
|
||||||
{:reply, :ok, next_state}
|
{:reply, :ok, next_state}
|
||||||
|
|
||||||
@@ -170,6 +177,7 @@ defmodule BDS.Tasks do
|
|||||||
finished_at: DateTime.utc_now()
|
finished_at: DateTime.utc_now()
|
||||||
})
|
})
|
||||||
|> start_queued_tasks()
|
|> start_queued_tasks()
|
||||||
|
|> schedule_finished_task_eviction()
|
||||||
|
|
||||||
{:reply, :ok, next_state}
|
{:reply, :ok, next_state}
|
||||||
end
|
end
|
||||||
@@ -183,6 +191,7 @@ defmodule BDS.Tasks do
|
|||||||
finished_at: DateTime.utc_now()
|
finished_at: DateTime.utc_now()
|
||||||
})
|
})
|
||||||
|> start_queued_tasks()
|
|> start_queued_tasks()
|
||||||
|
|> schedule_finished_task_eviction()
|
||||||
|
|
||||||
{:reply, :ok, next_state}
|
{:reply, :ok, next_state}
|
||||||
end
|
end
|
||||||
@@ -192,6 +201,10 @@ defmodule BDS.Tasks do
|
|||||||
{:noreply, maybe_report_progress(state, task_id, value, message)}
|
{:noreply, maybe_report_progress(state, task_id, value, message)}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def handle_info(:evict_finished_tasks, state) do
|
||||||
|
{:noreply, prune_expired_finished_tasks(state)}
|
||||||
|
end
|
||||||
|
|
||||||
def handle_info({ref, result}, state) do
|
def handle_info({ref, result}, state) do
|
||||||
case state.ref_to_task[ref] do
|
case state.ref_to_task[ref] do
|
||||||
nil ->
|
nil ->
|
||||||
@@ -225,6 +238,7 @@ defmodule BDS.Tasks do
|
|||||||
end
|
end
|
||||||
|> remove_running(task_id, ref)
|
|> remove_running(task_id, ref)
|
||||||
|> start_queued_tasks()
|
|> start_queued_tasks()
|
||||||
|
|> schedule_finished_task_eviction()
|
||||||
|
|
||||||
{:noreply, next_state}
|
{:noreply, next_state}
|
||||||
end
|
end
|
||||||
@@ -255,6 +269,7 @@ defmodule BDS.Tasks do
|
|||||||
end
|
end
|
||||||
|> remove_running(task_id, ref)
|
|> remove_running(task_id, ref)
|
||||||
|> start_queued_tasks()
|
|> start_queued_tasks()
|
||||||
|
|> schedule_finished_task_eviction()
|
||||||
|
|
||||||
{:noreply, next_state}
|
{:noreply, next_state}
|
||||||
end
|
end
|
||||||
@@ -347,6 +362,29 @@ defmodule BDS.Tasks do
|
|||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp schedule_finished_task_eviction(state) do
|
||||||
|
Process.send_after(self(), :evict_finished_tasks, finished_task_ttl_ms())
|
||||||
|
state
|
||||||
|
end
|
||||||
|
|
||||||
|
defp prune_expired_finished_tasks(state) do
|
||||||
|
now = DateTime.utc_now()
|
||||||
|
|
||||||
|
tasks =
|
||||||
|
Map.reject(state.tasks, fn {_task_id, task} ->
|
||||||
|
expired_finished_task?(task, now)
|
||||||
|
end)
|
||||||
|
|
||||||
|
%{state | tasks: tasks}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp expired_finished_task?(%{status: status, finished_at: %DateTime{} = finished_at}, now)
|
||||||
|
when status in [:completed, :failed, :cancelled] do
|
||||||
|
DateTime.diff(now, finished_at, :millisecond) >= finished_task_ttl_ms()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp expired_finished_task?(_task, _now), do: false
|
||||||
|
|
||||||
defp public_task(nil), do: nil
|
defp public_task(nil), do: nil
|
||||||
|
|
||||||
defp public_task(task) do
|
defp public_task(task) do
|
||||||
@@ -460,6 +498,11 @@ defmodule BDS.Tasks do
|
|||||||
|> Keyword.get(:recent_finished_limit, @default_recent_finished_limit)
|
|> Keyword.get(:recent_finished_limit, @default_recent_finished_limit)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp finished_task_ttl_ms do
|
||||||
|
Application.get_env(:bds, :tasks, [])
|
||||||
|
|> Keyword.get(:finished_task_ttl_ms, @default_finished_task_ttl_ms)
|
||||||
|
end
|
||||||
|
|
||||||
defp attr(attrs, key) do
|
defp attr(attrs, key) do
|
||||||
cond do
|
cond do
|
||||||
Map.has_key?(attrs, key) -> Map.get(attrs, key)
|
Map.has_key?(attrs, key) -> Map.get(attrs, key)
|
||||||
|
|||||||
@@ -143,7 +143,10 @@ defmodule BDS.TasksTest do
|
|||||||
assert snapshot.running_task_overflow == 1
|
assert snapshot.running_task_overflow == 1
|
||||||
assert snapshot.running_task_message == "preview build: halfway"
|
assert snapshot.running_task_message == "preview build: halfway"
|
||||||
|
|
||||||
assert [%{id: first_id, status: :running, progress: 0.5, group_name: "Generation"}, %{id: second_id, status: :running}] =
|
assert [
|
||||||
|
%{id: first_id, status: :running, progress: 0.5, group_name: "Generation"},
|
||||||
|
%{id: second_id, status: :running}
|
||||||
|
] =
|
||||||
snapshot.tasks
|
snapshot.tasks
|
||||||
|
|
||||||
assert first_id == first.id
|
assert first_id == first.id
|
||||||
@@ -161,7 +164,8 @@ defmodule BDS.TasksTest do
|
|||||||
%{group_id: "maintenance", group_name: "Maintenance"}
|
%{group_id: "maintenance", group_name: "Maintenance"}
|
||||||
)
|
)
|
||||||
|
|
||||||
completed = wait_for_task(task.id, &(&1.status == :completed and &1.result == %{counts: %{posts: 2}}))
|
completed =
|
||||||
|
wait_for_task(task.id, &(&1.status == :completed and &1.result == %{counts: %{posts: 2}}))
|
||||||
|
|
||||||
snapshot = BDS.Tasks.status_snapshot()
|
snapshot = BDS.Tasks.status_snapshot()
|
||||||
|
|
||||||
@@ -171,10 +175,34 @@ defmodule BDS.TasksTest do
|
|||||||
assert snapshot.running_task_message == nil
|
assert snapshot.running_task_message == nil
|
||||||
|
|
||||||
assert Enum.any?(snapshot.tasks, fn item ->
|
assert Enum.any?(snapshot.tasks, fn item ->
|
||||||
item.id == completed.id and item.status == :completed and item.result == %{counts: %{posts: 2}}
|
item.id == completed.id and item.status == :completed and
|
||||||
|
item.result == %{counts: %{posts: 2}}
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "finished tasks are evicted after the configured TTL" do
|
||||||
|
Application.put_env(:bds, :tasks,
|
||||||
|
max_concurrent: 3,
|
||||||
|
progress_throttle_ms: 250,
|
||||||
|
finished_task_ttl_ms: 1
|
||||||
|
)
|
||||||
|
|
||||||
|
assert {:ok, task} = BDS.Tasks.register_external_task("short lived")
|
||||||
|
assert {:ok, running} = BDS.Tasks.register_external_task("still running")
|
||||||
|
|
||||||
|
on_exit(fn -> _ = BDS.Tasks.complete_task(running.id) end)
|
||||||
|
|
||||||
|
assert :ok = BDS.Tasks.complete_task(task.id)
|
||||||
|
assert wait_for_task(task.id, &(&1.status == :completed)).status == :completed
|
||||||
|
|
||||||
|
Process.sleep(20)
|
||||||
|
|
||||||
|
task_ids = BDS.Tasks.list_tasks() |> Enum.map(& &1.id)
|
||||||
|
|
||||||
|
refute task.id in task_ids
|
||||||
|
assert running.id in task_ids
|
||||||
|
end
|
||||||
|
|
||||||
defp receive_started do
|
defp receive_started do
|
||||||
receive do
|
receive do
|
||||||
{:started, name, pid} -> {name, pid}
|
{:started, name, pid} -> {name, pid}
|
||||||
|
|||||||
Reference in New Issue
Block a user