fix: implemented TD-09, supervised workers now receive shutdowna nd can run cooperative cleanup
This commit is contained in:
12
TECHDEBTS.md
12
TECHDEBTS.md
@@ -385,7 +385,17 @@ production code.
|
||||
**Acceptance.** Production module has zero sandbox references; full suite
|
||||
green.
|
||||
|
||||
### TD-09: Graceful task cancellation in `BDS.Tasks`
|
||||
### TD-09: Graceful task cancellation in `BDS.Tasks` ✅ DONE (2026-06-12)
|
||||
|
||||
**Status: implemented.** `BDS.Tasks.cancel_task/1` and
|
||||
`BDS.Scripting.JobRunner.handle_call(:cancel, ...)` now terminate supervised
|
||||
workers through their owning `Task.Supervisor` (`terminate_child/2`), so the
|
||||
worker receives `:shutdown` instead of an immediate `:kill`. Queue bookkeeping
|
||||
and `:cancelled` status semantics are unchanged, but cooperative cleanup now
|
||||
runs before the slot is freed. Coverage now includes a task worker that traps
|
||||
exits, confirms cleanup executes on cancellation, and proves the queued task is
|
||||
promoted afterward, plus a managed scripting job runtime test that traps exits
|
||||
and observes the same shutdown-driven cleanup path.
|
||||
|
||||
**Context.** `Tasks.cancel_task/1` uses `Process.exit(pid, :kill)` — the
|
||||
worker gets no chance to clean up mid-upload or mid-file-write
|
||||
|
||||
@@ -66,7 +66,7 @@ defmodule BDS.Scripting.JobRunner do
|
||||
@impl true
|
||||
def handle_call(:cancel, _from, state) do
|
||||
if is_pid(state.task_pid) do
|
||||
Process.exit(state.task_pid, :kill)
|
||||
_ = Task.Supervisor.terminate_child(BDS.Scripting.TaskSupervisor, state.task_pid)
|
||||
end
|
||||
|
||||
:ok =
|
||||
|
||||
@@ -127,7 +127,7 @@ defmodule BDS.Tasks do
|
||||
cond do
|
||||
Map.has_key?(state.running, task_id) ->
|
||||
%{pid: pid, ref: ref} = state.running[task_id]
|
||||
Process.exit(pid, :kill)
|
||||
_ = Task.Supervisor.terminate_child(BDS.Tasks.TaskSupervisor, pid)
|
||||
|
||||
next_state =
|
||||
state
|
||||
|
||||
@@ -36,6 +36,30 @@ defmodule BDS.Scripting.JobTest do
|
||||
end
|
||||
end
|
||||
|
||||
defmodule ShutdownAwareRuntime do
|
||||
@behaviour BDS.Scripting.Runtime
|
||||
|
||||
@impl true
|
||||
def validate(_source), do: :ok
|
||||
|
||||
@impl true
|
||||
def execute(_source, _entrypoint, _args, opts) do
|
||||
Process.flag(:trap_exit, true)
|
||||
|
||||
if callback = Keyword.get(opts, :on_progress) do
|
||||
callback.(%{"phase" => "started", "current" => 1, "total" => 2})
|
||||
end
|
||||
|
||||
test_pid = Keyword.fetch!(opts, :test_pid)
|
||||
|
||||
receive do
|
||||
{:EXIT, _from, :shutdown} ->
|
||||
send(test_pid, :job_cleanup_ran)
|
||||
{:ok, :cancelled}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
setup do
|
||||
original = Application.fetch_env!(:bds, :scripting)
|
||||
|
||||
@@ -90,6 +114,25 @@ defmodule BDS.Scripting.JobTest do
|
||||
assert cancelled_job.finished_at != nil
|
||||
end
|
||||
|
||||
test "cancelled managed script jobs receive shutdown for cleanup" do
|
||||
Application.put_env(:bds, :scripting,
|
||||
runtime: ShutdownAwareRuntime,
|
||||
timeout: 300_000,
|
||||
max_reductions: 5_000_000,
|
||||
job_timeout: :infinity,
|
||||
job_max_reductions: :none
|
||||
)
|
||||
|
||||
assert {:ok, job} = BDS.Scripting.start_job("irrelevant", "main", [], test_pid: self())
|
||||
_running_job = wait_for_job(job.id, &(&1.status == :running))
|
||||
|
||||
assert :ok = BDS.Scripting.cancel_job(job.id)
|
||||
assert_receive :job_cleanup_ran, 1_000
|
||||
|
||||
cancelled_job = wait_for_job(job.id, &(&1.status == :cancelled))
|
||||
assert cancelled_job.finished_at != nil
|
||||
end
|
||||
|
||||
test "killing a runner detaches it from JobStore (CSM-004)" do
|
||||
Application.put_env(:bds, :scripting,
|
||||
runtime: BlockingRuntime,
|
||||
|
||||
@@ -95,6 +95,41 @@ defmodule BDS.TasksTest do
|
||||
assert wait_for_task(third.id, &(&1.status == :completed)).status == :completed
|
||||
end
|
||||
|
||||
test "cancel_task delivers shutdown so cleanup runs before freeing the slot" do
|
||||
Application.put_env(:bds, :tasks, max_concurrent: 1, progress_throttle_ms: 250)
|
||||
|
||||
runner = self()
|
||||
|
||||
cleanup_work = fn _report ->
|
||||
Process.flag(:trap_exit, true)
|
||||
send(runner, {:started, "cleanup", self()})
|
||||
|
||||
receive do
|
||||
{:EXIT, _from, :shutdown} ->
|
||||
send(runner, :cleanup_ran)
|
||||
{:ok, :cancelled}
|
||||
end
|
||||
end
|
||||
|
||||
queued_work = fn _report ->
|
||||
send(runner, {:started, "queued", self()})
|
||||
{:ok, :queued_completed}
|
||||
end
|
||||
|
||||
assert {:ok, running} = BDS.Tasks.submit_task("cleanup", cleanup_work)
|
||||
assert {:ok, queued} = BDS.Tasks.submit_task("queued", queued_work)
|
||||
|
||||
assert {"cleanup", _pid} = receive_started()
|
||||
assert BDS.Tasks.get_task(queued.id).status == :pending
|
||||
|
||||
assert :ok = BDS.Tasks.cancel_task(running.id)
|
||||
assert_receive :cleanup_ran, 1_000
|
||||
assert wait_for_task(running.id, &(&1.status == :cancelled)).status == :cancelled
|
||||
|
||||
assert {"queued", _pid} = receive_started()
|
||||
assert wait_for_task(queued.id, &(&1.status == :completed)).result == :queued_completed
|
||||
end
|
||||
|
||||
test "progress reports within 250ms throttle window are silently dropped" do
|
||||
assert {:ok, task} = BDS.Tasks.register_external_task("fast progress")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user