From ae66775cb7b35c2b138a5fb03feb3a8d52a718f9 Mon Sep 17 00:00:00 2001 From: Chili Palmer Date: Fri, 12 Jun 2026 13:17:01 +0200 Subject: [PATCH] Close TD-14 replace polling with messaging --- TECHDEBTS.md | 15 +++++- lib/bds/cli_sync.ex | 5 ++ lib/bds/cli_sync/watcher.ex | 33 ++++++++---- lib/bds/desktop/shell_commands.ex | 65 ++++++++++++++++++------ lib/bds/tasks.ex | 27 ++++++++++ test/bds/cli_sync_test.exs | 41 +++++++++++++++ test/bds/desktop/shell_commands_test.exs | 15 ++++++ test/bds/tasks_test.exs | 14 +++++ 8 files changed, 188 insertions(+), 27 deletions(-) diff --git a/TECHDEBTS.md b/TECHDEBTS.md index d9a27f7..c7ad96f 100644 --- a/TECHDEBTS.md +++ b/TECHDEBTS.md @@ -547,7 +547,20 @@ file. Consider whether `scp_uploads` state should be ETS **Acceptance.** Upload of N files makes O(1) GenServer calls for mtime bookkeeping, not O(N)·2; behavior identical for incremental uploads. -### TD-14: Replace polling with messaging (CliSync watcher + rebuild sequencing) +### TD-14: Replace polling with messaging (CliSync watcher + rebuild sequencing) ✅ DONE (2026-06-12) + +**Status: implemented.** `BDS.CliSync.Watcher` now gates notification-table +work behind SQLite `PRAGMA data_version`, so unchanged databases no longer force +repeated `db_notifications` queries at the 100 ms watch cadence; the watcher +still performs a real notification fetch/prune pass on the first poll and on +every external commit boundary. Rebuild sequencing in +`BDS.Desktop.ShellCommands` no longer sleep-polls `Tasks.list_tasks/0`. +`BDS.Tasks` now broadcasts terminal task states on `BDS.PubSub`, and +`wait_for_group_phase/3` subscribes and waits on those messages with the same +deadline semantics. Coverage now includes a watcher test that proves unchanged +`data_version` skips notification queries, a tasks test that proves terminal +state broadcasts, and a shell-command guard test that forbids `Process.sleep` +polling in the rebuild wait path. **Context.** Two polling loops: 1. `CliSync.Watcher` polls the SQLite notifications table every **100 ms diff --git a/lib/bds/cli_sync.ex b/lib/bds/cli_sync.ex index de18f24..6f4701f 100644 --- a/lib/bds/cli_sync.ex +++ b/lib/bds/cli_sync.ex @@ -54,6 +54,11 @@ defmodule BDS.CliSync do end)} end + def data_version do + %{rows: [[version]]} = Repo.query!("PRAGMA data_version", []) + version + end + def prune_notifications(now \\ Persistence.now_ms()) when is_integer(now) do {processed_count, _} = Repo.delete_all( diff --git a/lib/bds/cli_sync/watcher.ex b/lib/bds/cli_sync/watcher.ex index be35fc5..4dbcbaf 100644 --- a/lib/bds/cli_sync/watcher.ex +++ b/lib/bds/cli_sync/watcher.ex @@ -29,7 +29,12 @@ defmodule BDS.CliSync.Watcher do Keyword.get(opts, :poll_interval_ms), @default_poll_interval_ms ), - pubsub: Keyword.get(opts, :pubsub, BDS.PubSub) + pubsub: Keyword.get(opts, :pubsub, BDS.PubSub), + data_version_reader: Keyword.get(opts, :data_version_reader, &CliSync.data_version/0), + notification_fetcher: + Keyword.get(opts, :notification_fetcher, &CliSync.db_file_change_detected/0), + pruner: Keyword.get(opts, :pruner, &CliSync.prune_notifications/0), + last_data_version: nil } {:ok, schedule_poll(state)} @@ -49,18 +54,24 @@ defmodule BDS.CliSync.Watcher do end defp process_notifications(state) do - {:ok, notifications} = CliSync.db_file_change_detected() - {:ok, _pruned} = CliSync.prune_notifications() + current_data_version = state.data_version_reader.() - Enum.each(notifications, fn notification -> - Phoenix.PubSub.broadcast( - state.pubsub, - topic(), - {:entity_changed, notification_payload(notification)} - ) - end) + if state.last_data_version == current_data_version do + %{state | last_data_version: current_data_version} + else + {:ok, notifications} = state.notification_fetcher.() + {:ok, _pruned} = state.pruner.() - state + Enum.each(notifications, fn notification -> + Phoenix.PubSub.broadcast( + state.pubsub, + topic(), + {:entity_changed, notification_payload(notification)} + ) + end) + + %{state | last_data_version: current_data_version} + end end defp notification_payload(notification) do diff --git a/lib/bds/desktop/shell_commands.ex b/lib/bds/desktop/shell_commands.ex index ef0c263..9718cc0 100644 --- a/lib/bds/desktop/shell_commands.ex +++ b/lib/bds/desktop/shell_commands.ex @@ -559,27 +559,62 @@ defmodule BDS.Desktop.ShellCommands do end end - defp wait_for_group_phase(_group_id, _names, timeout) when timeout <= 0, do: :timeout - defp wait_for_group_phase(group_id, names, timeout) do + if timeout <= 0 do + :timeout + else + Phoenix.PubSub.subscribe(BDS.PubSub, Tasks.topic()) + + try do + case group_phase_status(group_id, names) do + :waiting -> wait_for_group_phase_message(group_id, names, timeout) + status -> status + end + after + Phoenix.PubSub.unsubscribe(BDS.PubSub, Tasks.topic()) + end + end + end + + defp wait_for_group_phase_message(group_id, names, timeout) do + started_at = System.monotonic_time(:millisecond) + + receive do + {:task_terminal, task} -> + elapsed = System.monotonic_time(:millisecond) - started_at + + cond do + task.group_id == group_id and task.name in names and task.status == :failed -> + :failed + + task.group_id == group_id and task.name in names -> + case group_phase_status(group_id, names) do + :waiting -> + wait_for_group_phase_message(group_id, names, timeout - elapsed) + + status -> + status + end + + true -> + wait_for_group_phase_message(group_id, names, timeout - elapsed) + end + after + timeout -> + :timeout + end + end + + defp group_phase_status(group_id, names) do tasks = BDS.Tasks.list_tasks() |> Enum.filter(&(&1.group_id == group_id and &1.name in names)) cond do - length(tasks) < length(names) -> - Process.sleep(50) - wait_for_group_phase(group_id, names, timeout - 50) - - Enum.any?(tasks, &(&1.status == :failed)) -> - :failed - - Enum.all?(tasks, &(&1.status == :completed)) -> - :ok - - true -> - Process.sleep(50) - wait_for_group_phase(group_id, names, timeout - 50) + length(tasks) < length(names) -> :waiting + Enum.any?(tasks, &(&1.status == :failed)) -> :failed + Enum.all?(tasks, &(&1.status == :completed)) -> :ok + true -> :waiting end end diff --git a/lib/bds/tasks.ex b/lib/bds/tasks.ex index 402e9ca..d0bcb55 100644 --- a/lib/bds/tasks.ex +++ b/lib/bds/tasks.ex @@ -7,11 +7,14 @@ defmodule BDS.Tasks do @default_progress_throttle_ms 250 @default_recent_finished_limit 10 @default_finished_task_ttl_ms :timer.hours(1) + @topic "tasks" def start_link(_opts) do GenServer.start_link(__MODULE__, %{}, name: __MODULE__) end + def topic, do: @topic + def submit_task(name, work, attrs \\ %{}) when is_binary(name) and is_function(work, 1) and is_map(attrs) do GenServer.call(__MODULE__, {:submit_task, name, work, attrs}) @@ -136,6 +139,8 @@ defmodule BDS.Tasks do |> start_queued_tasks() |> schedule_finished_task_eviction() + broadcast_terminal_task(next_state.tasks[task_id]) + {:reply, :ok, next_state} Enum.any?(state.queue, fn {queued_id, _work} -> queued_id == task_id end) -> @@ -148,6 +153,8 @@ defmodule BDS.Tasks do |> start_queued_tasks() |> schedule_finished_task_eviction() + broadcast_terminal_task(next_state.tasks[task_id]) + {:reply, :ok, next_state} state.tasks[task_id] == nil -> @@ -179,6 +186,8 @@ defmodule BDS.Tasks do |> start_queued_tasks() |> schedule_finished_task_eviction() + broadcast_terminal_task(next_state.tasks[task_id]) + {:reply, :ok, next_state} end @@ -193,6 +202,8 @@ defmodule BDS.Tasks do |> start_queued_tasks() |> schedule_finished_task_eviction() + broadcast_terminal_task(next_state.tasks[task_id]) + {:reply, :ok, next_state} end @@ -240,6 +251,10 @@ defmodule BDS.Tasks do |> start_queued_tasks() |> schedule_finished_task_eviction() + if task.status != :cancelled do + broadcast_terminal_task(next_state.tasks[task_id]) + end + {:noreply, next_state} end end @@ -271,6 +286,10 @@ defmodule BDS.Tasks do |> start_queued_tasks() |> schedule_finished_task_eviction() + if task.status != :cancelled and next_state.tasks[task_id].status == :failed do + broadcast_terminal_task(next_state.tasks[task_id]) + end + {:noreply, next_state} end end @@ -362,6 +381,14 @@ defmodule BDS.Tasks do end) end + defp broadcast_terminal_task(nil), do: :ok + + defp broadcast_terminal_task(task) when task.status in [:completed, :failed, :cancelled] do + Phoenix.PubSub.broadcast(BDS.PubSub, topic(), {:task_terminal, public_task(task)}) + end + + defp broadcast_terminal_task(_task), do: :ok + defp schedule_finished_task_eviction(state) do Process.send_after(self(), :evict_finished_tasks, finished_task_ttl_ms()) state diff --git a/test/bds/cli_sync_test.exs b/test/bds/cli_sync_test.exs index 54e3e9e..167a3ce 100644 --- a/test/bds/cli_sync_test.exs +++ b/test/bds/cli_sync_test.exs @@ -76,6 +76,47 @@ defmodule BDS.CliSyncTest do assert is_integer(seen_notification.seen_at) end + test "watcher skips notification queries when sqlite data_version is unchanged" do + test_pid = self() + data_version = :erlang.make_ref() + :persistent_term.put(data_version, [7, 7]) + + data_version_reader = fn -> + [next | rest] = :persistent_term.get(data_version) + :persistent_term.put(data_version, rest) + next + end + + notification_fetcher = fn -> + send(test_pid, :notifications_fetched) + {:ok, []} + end + + pruner = fn -> + send(test_pid, :notifications_pruned) + {:ok, %{processed: 0, unprocessed: 0}} + end + + on_exit(fn -> :persistent_term.erase(data_version) end) + + watcher = + start_supervised!( + {Watcher, + poll_interval_ms: 60_000, + data_version_reader: data_version_reader, + notification_fetcher: notification_fetcher, + pruner: pruner} + ) + + :ok = Watcher.poll_now(watcher) + assert_receive :notifications_fetched, 500 + assert_receive :notifications_pruned, 500 + + :ok = Watcher.poll_now(watcher) + refute_receive :notifications_fetched, 100 + refute_receive :notifications_pruned, 100 + end + test "processed notifications are pruned after one hour and unprocessed notifications after one day" do now = BDS.Persistence.now_ms() diff --git a/test/bds/desktop/shell_commands_test.exs b/test/bds/desktop/shell_commands_test.exs index f5776dc..ae8eeb4 100644 --- a/test/bds/desktop/shell_commands_test.exs +++ b/test/bds/desktop/shell_commands_test.exs @@ -914,6 +914,21 @@ defmodule BDS.Desktop.ShellCommandsTest do assert message =~ "Project database is not initialized" end + test "rebuild sequencing waits on task messages instead of sleep polling" do + source = File.read!("lib/bds/desktop/shell_commands.ex") + + func_source = + Regex.scan(~r/defp wait_for_group_phase(?:_message)?\(.*?(?=\n defp |\nend)/s, source) + |> Enum.map(&List.first/1) + |> Enum.join("\n") + + refute String.contains?(func_source, "Process.sleep"), + "wait_for_group_phase should not use sleep polling" + + assert String.contains?(func_source, "Phoenix.PubSub.subscribe") + assert String.contains?(func_source, "receive") + end + defp wait_for_task(task_id, matcher, timeout \\ 2_000) defp wait_for_task(task_id, _matcher, timeout) when timeout <= 0 do diff --git a/test/bds/tasks_test.exs b/test/bds/tasks_test.exs index 8b8f493..f7552f1 100644 --- a/test/bds/tasks_test.exs +++ b/test/bds/tasks_test.exs @@ -266,6 +266,20 @@ defmodule BDS.TasksTest do assert running.id in task_ids end + test "terminal task states are broadcast on PubSub" do + Phoenix.PubSub.subscribe(BDS.PubSub, BDS.Tasks.topic()) + + assert {:ok, completed} = + BDS.Tasks.submit_task("broadcast completion", fn _report -> {:ok, :done} end, + %{group_id: "broadcast-group", group_name: "Maintenance"} + ) + + assert_receive {:task_terminal, task_event}, 1_000 + assert task_event.id == completed.id + assert task_event.group_id == "broadcast-group" + assert task_event.status == :completed + end + defp receive_started do receive do {:started, name, pid} -> {name, pid}