diff --git a/TECHDEBTS.md b/TECHDEBTS.md index c7ad96f..38e8fb6 100644 --- a/TECHDEBTS.md +++ b/TECHDEBTS.md @@ -590,7 +590,16 @@ polling in the rebuild wait path. unchanged (or interval ≥ 1s with backoff); rebuild sequencing has no `Process.sleep`; CLI-sync round-trip latency stays ≤ current behavior. -### TD-15: `BDS.Tasks` housekeeping (queue type, eviction timers) +### TD-15: `BDS.Tasks` housekeeping (queue type, eviction timers) ✅ DONE (2026-06-12) + +**Status: implemented.** `BDS.Tasks` now uses `:queue` for its pending work +queue, so enqueue/dequeue on the hot path are O(1) and the FIFO behavior is +unchanged. Finished-task cleanup now tracks a single live eviction timer ref +instead of scheduling a fresh `send_after/3` on every terminal task; the timer +fires, prunes expired finished tasks, and only reschedules itself if finished +tasks still remain. Coverage now includes a focused task-state test proving +multiple finished tasks share the same live eviction timer and a source guard +that forbids `queue ++` churn. **Context.** Minor inefficiencies in `tasks.ex`: the pending queue is a list appended with `++` (O(n) per submit), and **every** finishing task schedules diff --git a/lib/bds/tasks.ex b/lib/bds/tasks.ex index d0bcb55..44dd011 100644 --- a/lib/bds/tasks.ex +++ b/lib/bds/tasks.ex @@ -69,9 +69,10 @@ defmodule BDS.Tasks do {:ok, %{ tasks: %{}, - queue: [], + queue: :queue.new(), running: %{}, - ref_to_task: %{} + ref_to_task: %{}, + finished_task_eviction_timer: nil }} end @@ -83,8 +84,7 @@ defmodule BDS.Tasks do if map_size(next_state.running) < max_concurrent() do {:reply, {:ok, public_task(task)}, start_task(next_state, task.id, work)} else - {:reply, {:ok, public_task(task)}, - %{next_state | queue: next_state.queue ++ [{task.id, work}]}} + {:reply, {:ok, public_task(task)}, enqueue_task(next_state, task.id, work)} end end @@ -143,13 +143,11 @@ defmodule BDS.Tasks do {:reply, :ok, next_state} - Enum.any?(state.queue, fn {queued_id, _work} -> queued_id == task_id end) -> + queued_task?(state.queue, task_id) -> next_state = state |> update_task(task_id, %{status: :cancelled, finished_at: DateTime.utc_now()}) - |> Map.update!(:queue, fn queue -> - Enum.reject(queue, fn {queued_id, _work} -> queued_id == task_id end) - end) + |> remove_queued_task(task_id) |> start_queued_tasks() |> schedule_finished_task_eviction() @@ -213,7 +211,19 @@ defmodule BDS.Tasks do end def handle_info(:evict_finished_tasks, state) do - {:noreply, prune_expired_finished_tasks(state)} + next_state = + state + |> Map.put(:finished_task_eviction_timer, nil) + |> prune_expired_finished_tasks() + + next_state = + if any_finished_tasks?(next_state) do + schedule_finished_task_eviction(next_state) + else + next_state + end + + {:noreply, next_state} end def handle_info({ref, result}, state) do @@ -315,11 +325,11 @@ defmodule BDS.Tasks do map_size(state.running) >= max_concurrent() -> state - state.queue == [] -> + :queue.is_empty(state.queue) -> state true -> - [{task_id, work} | remaining] = state.queue + {{:value, {task_id, work}}, remaining} = :queue.out(state.queue) state |> Map.put(:queue, remaining) @@ -390,8 +400,12 @@ defmodule BDS.Tasks do defp broadcast_terminal_task(_task), do: :ok defp schedule_finished_task_eviction(state) do - Process.send_after(self(), :evict_finished_tasks, finished_task_ttl_ms()) - state + if state.finished_task_eviction_timer do + state + else + timer_ref = Process.send_after(self(), :evict_finished_tasks, finished_task_ttl_ms()) + %{state | finished_task_eviction_timer: timer_ref} + end end defp prune_expired_finished_tasks(state) do @@ -405,6 +419,32 @@ defmodule BDS.Tasks do %{state | tasks: tasks} end + defp enqueue_task(state, task_id, work) do + %{state | queue: :queue.in({task_id, work}, state.queue)} + end + + defp queued_task?(queue, task_id) do + queue + |> :queue.to_list() + |> Enum.any?(fn {queued_id, _work} -> queued_id == task_id end) + end + + defp remove_queued_task(state, task_id) do + remaining_queue = + state.queue + |> :queue.to_list() + |> Enum.reject(fn {queued_id, _work} -> queued_id == task_id end) + |> :queue.from_list() + + %{state | queue: remaining_queue} + end + + defp any_finished_tasks?(state) do + Enum.any?(state.tasks, fn {_task_id, task} -> + task.status in [:completed, :failed, :cancelled] + end) + end + defp expired_finished_task?(%{status: status, finished_at: %DateTime{} = finished_at}, now) when status in [:completed, :failed, :cancelled] do DateTime.diff(now, finished_at, :millisecond) >= finished_task_ttl_ms() diff --git a/test/bds/tasks_test.exs b/test/bds/tasks_test.exs index f7552f1..c295c0c 100644 --- a/test/bds/tasks_test.exs +++ b/test/bds/tasks_test.exs @@ -266,6 +266,37 @@ defmodule BDS.TasksTest do assert running.id in task_ids end + test "finished task eviction uses a single live timer" do + Application.put_env(:bds, :tasks, + max_concurrent: 3, + progress_throttle_ms: 250, + finished_task_ttl_ms: 50 + ) + + assert {:ok, first} = BDS.Tasks.register_external_task("first finished") + assert {:ok, second} = BDS.Tasks.register_external_task("second finished") + + assert :ok = BDS.Tasks.complete_task(first.id) + first_timer = :sys.get_state(BDS.Tasks).finished_task_eviction_timer + assert is_reference(first_timer) + assert is_integer(Process.read_timer(first_timer)) + + assert :ok = BDS.Tasks.complete_task(second.id) + second_timer = :sys.get_state(BDS.Tasks).finished_task_eviction_timer + + assert second_timer == first_timer + assert is_integer(Process.read_timer(second_timer)) + end + + test "task queue implementation avoids list append churn" do + source = File.read!("lib/bds/tasks.ex") + + assert String.contains?(source, ":queue"), "tasks queue should use :queue" + + refute String.contains?(source, "queue ++"), + "tasks queue should not append with ++" + end + test "terminal task states are broadcast on PubSub" do Phoenix.PubSub.subscribe(BDS.PubSub, BDS.Tasks.topic()) @@ -304,4 +335,5 @@ defmodule BDS.TasksTest do defp wait_for_task(_task_id, _predicate, 0) do flunk("task did not reach expected state") end + end