Close TD-15 task housekeeping

This commit is contained in:
2026-06-12 13:20:57 +02:00
parent ae66775cb7
commit f7e1662bca
3 changed files with 95 additions and 14 deletions

View File

@@ -590,7 +590,16 @@ polling in the rebuild wait path.
unchanged (or interval ≥ 1s with backoff); rebuild sequencing has no
`Process.sleep`; CLI-sync round-trip latency stays ≤ current behavior.
### TD-15: `BDS.Tasks` housekeeping (queue type, eviction timers)
### TD-15: `BDS.Tasks` housekeeping (queue type, eviction timers) ✅ DONE (2026-06-12)
**Status: implemented.** `BDS.Tasks` now uses `:queue` for its pending work
queue, so enqueue/dequeue on the hot path are O(1) and the FIFO behavior is
unchanged. Finished-task cleanup now tracks a single live eviction timer ref
instead of scheduling a fresh `send_after/3` on every terminal task; the timer
fires, prunes expired finished tasks, and only reschedules itself if finished
tasks still remain. Coverage now includes a focused task-state test proving
multiple finished tasks share the same live eviction timer and a source guard
that forbids `queue ++` churn.
**Context.** Minor inefficiencies in `tasks.ex`: the pending queue is a list
appended with `++` (O(n) per submit), and **every** finishing task schedules

View File

@@ -69,9 +69,10 @@ defmodule BDS.Tasks do
{:ok,
%{
tasks: %{},
queue: [],
queue: :queue.new(),
running: %{},
ref_to_task: %{}
ref_to_task: %{},
finished_task_eviction_timer: nil
}}
end
@@ -83,8 +84,7 @@ defmodule BDS.Tasks do
if map_size(next_state.running) < max_concurrent() do
{:reply, {:ok, public_task(task)}, start_task(next_state, task.id, work)}
else
{:reply, {:ok, public_task(task)},
%{next_state | queue: next_state.queue ++ [{task.id, work}]}}
{:reply, {:ok, public_task(task)}, enqueue_task(next_state, task.id, work)}
end
end
@@ -143,13 +143,11 @@ defmodule BDS.Tasks do
{:reply, :ok, next_state}
Enum.any?(state.queue, fn {queued_id, _work} -> queued_id == task_id end) ->
queued_task?(state.queue, task_id) ->
next_state =
state
|> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()})
|> Map.update!(:queue, fn queue ->
Enum.reject(queue, fn {queued_id, _work} -> queued_id == task_id end)
end)
|> remove_queued_task(task_id)
|> start_queued_tasks()
|> schedule_finished_task_eviction()
@@ -213,7 +211,19 @@ defmodule BDS.Tasks do
end
def handle_info(:evict_finished_tasks, state) do
{:noreply, prune_expired_finished_tasks(state)}
next_state =
state
|> Map.put(:finished_task_eviction_timer, nil)
|> prune_expired_finished_tasks()
next_state =
if any_finished_tasks?(next_state) do
schedule_finished_task_eviction(next_state)
else
next_state
end
{:noreply, next_state}
end
def handle_info({ref, result}, state) do
@@ -315,11 +325,11 @@ defmodule BDS.Tasks do
map_size(state.running) >= max_concurrent() ->
state
state.queue == [] ->
:queue.is_empty(state.queue) ->
state
true ->
[{task_id, work} | remaining] = state.queue
{{:value, {task_id, work}}, remaining} = :queue.out(state.queue)
state
|> Map.put(:queue, remaining)
@@ -390,8 +400,12 @@ defmodule BDS.Tasks do
defp broadcast_terminal_task(_task), do: :ok
defp schedule_finished_task_eviction(state) do
Process.send_after(self(), :evict_finished_tasks, finished_task_ttl_ms())
state
if state.finished_task_eviction_timer do
state
else
timer_ref = Process.send_after(self(), :evict_finished_tasks, finished_task_ttl_ms())
%{state | finished_task_eviction_timer: timer_ref}
end
end
defp prune_expired_finished_tasks(state) do
@@ -405,6 +419,32 @@ defmodule BDS.Tasks do
%{state | tasks: tasks}
end
defp enqueue_task(state, task_id, work) do
%{state | queue: :queue.in({task_id, work}, state.queue)}
end
defp queued_task?(queue, task_id) do
queue
|> :queue.to_list()
|> Enum.any?(fn {queued_id, _work} -> queued_id == task_id end)
end
defp remove_queued_task(state, task_id) do
remaining_queue =
state.queue
|> :queue.to_list()
|> Enum.reject(fn {queued_id, _work} -> queued_id == task_id end)
|> :queue.from_list()
%{state | queue: remaining_queue}
end
defp any_finished_tasks?(state) do
Enum.any?(state.tasks, fn {_task_id, task} ->
task.status in [:completed, :failed, :cancelled]
end)
end
defp expired_finished_task?(%{status: status, finished_at: %DateTime{} = finished_at}, now)
when status in [:completed, :failed, :cancelled] do
DateTime.diff(now, finished_at, :millisecond) >= finished_task_ttl_ms()

View File

@@ -266,6 +266,37 @@ defmodule BDS.TasksTest do
assert running.id in task_ids
end
test "finished task eviction uses a single live timer" do
Application.put_env(:bds, :tasks,
max_concurrent: 3,
progress_throttle_ms: 250,
finished_task_ttl_ms: 50
)
assert {:ok, first} = BDS.Tasks.register_external_task("first finished")
assert {:ok, second} = BDS.Tasks.register_external_task("second finished")
assert :ok = BDS.Tasks.complete_task(first.id)
first_timer = :sys.get_state(BDS.Tasks).finished_task_eviction_timer
assert is_reference(first_timer)
assert is_integer(Process.read_timer(first_timer))
assert :ok = BDS.Tasks.complete_task(second.id)
second_timer = :sys.get_state(BDS.Tasks).finished_task_eviction_timer
assert second_timer == first_timer
assert is_integer(Process.read_timer(second_timer))
end
test "task queue implementation avoids list append churn" do
source = File.read!("lib/bds/tasks.ex")
assert String.contains?(source, ":queue"), "tasks queue should use :queue"
refute String.contains?(source, "queue ++"),
"tasks queue should not append with ++"
end
test "terminal task states are broadcast on PubSub" do
Phoenix.PubSub.subscribe(BDS.PubSub, BDS.Tasks.topic())
@@ -304,4 +335,5 @@ defmodule BDS.TasksTest do
defp wait_for_task(_task_id, _predicate, 0) do
flunk("task did not reach expected state")
end
end