From 741979fc3909ba1fa437279f1c488fce4cfe626b Mon Sep 17 00:00:00 2001 From: Chili Palmer Date: Fri, 12 Jun 2026 13:13:00 +0200 Subject: [PATCH] Close TD-13 publishing GenServer call surface --- TECHDEBTS.md | 12 +- lib/bds/publishing.ex | 248 +++++++++++++++++---------- test/bds/csm020_nested_case_test.exs | 8 +- test/bds/csm036_impl_true_test.exs | 2 +- test/bds/publishing_test.exs | 59 +++++++ 5 files changed, 233 insertions(+), 96 deletions(-) diff --git a/TECHDEBTS.md b/TECHDEBTS.md index 31371b2..d9a27f7 100644 --- a/TECHDEBTS.md +++ b/TECHDEBTS.md @@ -518,7 +518,17 @@ debounced save. Queries during a rebuild keep hitting the old index. **Acceptance.** Neighbor queries return while a duplicate scan is running (test with a large synthetic index); debounce/flush semantics unchanged. -### TD-13: Slim down the `Publishing` GenServer call surface +### TD-13: Slim down the `Publishing` GenServer call surface ✅ DONE (2026-06-12) + +**Status: implemented.** `BDS.Publishing` now keeps only SCP mtime state in its +GenServer. Publish-job creation, lookup, and status updates run directly through +`Repo`, while background-task job updates use a stable Repo caller so sandboxed +tests still exercise the real async path. SCP uploads no longer round-trip +through `should_upload_scp_file` / `mark_uploaded_scp_file` per file; each +target now batches one filter call for changed files and one bulk record call +for successfully uploaded mtimes. Coverage includes a focused batching test that +proves a multi-file SCP publish keeps bookkeeping traffic bounded instead of +scaling linearly with file count. **Context.** `Publishing` does Repo writes inside `handle_call` (`:upload_site`, `:update_job`) and the uploader makes per-file synchronous diff --git a/lib/bds/publishing.ex b/lib/bds/publishing.ex index 53e61f8..bf3a52a 100644 --- a/lib/bds/publishing.ex +++ b/lib/bds/publishing.ex @@ -28,52 +28,7 @@ defmodule BDS.Publishing do project = Projects.get_project!(project_id) normalized_credentials = normalize_credentials(credentials) targets = build_upload_targets(Projects.project_data_dir(project), normalized_credentials) - GenServer.call(__MODULE__, {:upload_site, project_id, normalized_credentials, targets, opts}) - end - @spec get_job(String.t()) :: PublishJob.t() | nil - def get_job(job_id) when is_binary(job_id) do - GenServer.call(__MODULE__, {:get_job, job_id}) - end - - @impl true - def init(_state) do - {:ok, %{scp_uploads: %{}}} - end - - @impl true - def handle_call({:get_job, job_id}, _from, state) do - {:reply, Repo.get(PublishJob, job_id), state} - end - - @impl true - def handle_call({:update_job, job_id, attrs}, _from, state) do - with %PublishJob{} = job <- Repo.get(PublishJob, job_id) do - attrs = Map.put(attrs, :updated_at, Persistence.now_ms()) - job |> PublishJob.changeset(attrs) |> Repo.update() - end - - {:reply, :ok, state} - end - - @impl true - def handle_call({:should_upload_scp_file, upload_key, local_mtime}, _from, state) do - should_upload? = - case state.scp_uploads[upload_key] do - nil -> true - recorded_mtime -> local_mtime > recorded_mtime - end - - {:reply, should_upload?, state} - end - - @impl true - def handle_call({:mark_uploaded_scp_file, upload_key, local_mtime}, _from, state) do - {:reply, :ok, put_in(state, [:scp_uploads, upload_key], local_mtime)} - end - - @impl true - def handle_call({:upload_site, project_id, credentials, targets, opts}, _from, state) do job_id = "publish-" <> Integer.to_string(System.unique_integer([:positive, :monotonic])) uploader = build_uploader(Keyword.put_new(opts, :project_id, project_id)) now = Persistence.now_ms() @@ -83,10 +38,10 @@ defmodule BDS.Publishing do project_id: project_id, status: :pending, task_id: nil, - ssh_host: credentials.ssh_host, - ssh_user: credentials.ssh_user, - ssh_remote_path: credentials.ssh_remote_path, - ssh_mode: credentials.ssh_mode, + ssh_host: normalized_credentials.ssh_host, + ssh_user: normalized_credentials.ssh_user, + ssh_remote_path: normalized_credentials.ssh_remote_path, + ssh_mode: normalized_credentials.ssh_mode, targets: Enum.map(targets, &to_string(&1.kind)), error: nil, inserted_at: now, @@ -102,7 +57,7 @@ defmodule BDS.Publishing do Tasks.submit_task( "publish #{project_id}", fn report -> - run_upload(job_id, credentials, targets, uploader, report) + run_upload(job_id, normalized_credentials, targets, uploader, report) end, %{ group_id: project_id, @@ -115,7 +70,51 @@ defmodule BDS.Publishing do |> PublishJob.changeset(%{task_id: task.id, updated_at: Persistence.now_ms()}) |> Repo.update!() - {:reply, {:ok, next_job}, state} + {:ok, next_job} + end + + @spec get_job(String.t()) :: PublishJob.t() | nil + def get_job(job_id) when is_binary(job_id) do + Repo.get(PublishJob, job_id) + end + + @impl true + def init(_state) do + {:ok, %{scp_uploads: %{}}} + end + + @impl true + def handle_call({:filter_scp_uploads, project_id, credentials, target_kind, files}, _from, state) do + {files_to_upload, next_uploads} = + Enum.reduce(files, {[], state.scp_uploads}, fn {relative_path, local_mtime}, {acc, uploads} -> + upload_key = scp_upload_key(project_id, credentials, target_kind, relative_path) + + if should_upload_mtime?(uploads[upload_key], local_mtime) do + {[{relative_path, local_mtime} | acc], uploads} + else + {acc, uploads} + end + end) + + {:reply, Enum.reverse(files_to_upload), %{state | scp_uploads: next_uploads}} + end + + @impl true + def handle_call( + {:record_uploaded_scp_files, project_id, credentials, target_kind, uploaded_files}, + _from, + state + ) do + next_uploads = + Enum.reduce(uploaded_files, state.scp_uploads, fn {relative_path, local_mtime}, uploads -> + Map.put( + uploads, + scp_upload_key(project_id, credentials, target_kind, relative_path), + local_mtime + ) + end) + + {:reply, :ok, %{state | scp_uploads: next_uploads}} end defp run_upload(job_id, credentials, targets, uploader, report) do @@ -147,7 +146,14 @@ defmodule BDS.Publishing do end defp update_job(job_id, attrs) do - GenServer.call(__MODULE__, {:update_job, job_id, attrs}) + repo_opts = repo_call_opts() + + with %PublishJob{} = job <- Repo.get(PublishJob, job_id, repo_opts) do + attrs = Map.put(attrs, :updated_at, Persistence.now_ms()) + _ = job |> PublishJob.changeset(attrs) |> Repo.update(repo_opts) + end + + :ok end defp build_uploader(opts) do @@ -188,41 +194,27 @@ defmodule BDS.Publishing do end defp run_command_upload(project_id, target, files, credentials, runner, ssh_auth_sock) do - Enum.reduce_while(files, :ok, fn relative_path, :ok -> - local_path = Path.join(target.local_dir, relative_path) + with {:ok, files_with_mtimes} <- collect_file_mtimes(target.local_dir, files) do + files_to_upload = + filter_scp_uploads(project_id, credentials, target.kind, files_with_mtimes) - with {:ok, local_mtime} <- file_mtime(local_path), - true <- - should_upload_scp_file?( - project_id, - credentials, - target.kind, - relative_path, - local_mtime - ) do - remote_path = remote_file_spec(credentials, target.remote_dir, relative_path) + case upload_scp_files( + project_id, + target, + credentials, + runner, + ssh_auth_sock, + files_to_upload, + [] + ) do + {:ok, uploaded_files} -> + persist_uploaded_scp_files(project_id, credentials, target.kind, uploaded_files) + :ok - case run_command(runner, "scp", ["-q", local_path, remote_path], ssh_auth_sock) do - :ok -> - :ok = - mark_uploaded_scp_file( - project_id, - credentials, - target.kind, - relative_path, - local_mtime - ) - - {:cont, :ok} - - {:error, reason} -> - {:halt, {:error, reason}} - end - else - false -> {:cont, :ok} - {:error, reason} -> {:halt, {:error, reason}} + {:error, reason} -> + {:error, reason} end - end) + end end defp run_command(runner, command, args, ssh_auth_sock) do @@ -254,22 +246,98 @@ defmodule BDS.Publishing do end end - defp should_upload_scp_file?(project_id, credentials, target_kind, relative_path, local_mtime) do + defp filter_scp_uploads(project_id, credentials, target_kind, files_with_mtimes) do GenServer.call( __MODULE__, - {:should_upload_scp_file, - scp_upload_key(project_id, credentials, target_kind, relative_path), local_mtime} + {:filter_scp_uploads, project_id, credentials, target_kind, files_with_mtimes} ) end - defp mark_uploaded_scp_file(project_id, credentials, target_kind, relative_path, local_mtime) do + defp record_uploaded_scp_files(project_id, credentials, target_kind, uploaded_files) do GenServer.call( __MODULE__, - {:mark_uploaded_scp_file, - scp_upload_key(project_id, credentials, target_kind, relative_path), local_mtime} + {:record_uploaded_scp_files, project_id, credentials, target_kind, uploaded_files} ) end + defp upload_scp_files( + _project_id, + _target, + _credentials, + _runner, + _ssh_auth_sock, + [], + uploaded_files + ) do + {:ok, Enum.reverse(uploaded_files)} + end + + defp upload_scp_files( + project_id, + target, + credentials, + runner, + ssh_auth_sock, + [{relative_path, local_mtime} | rest], + uploaded_files + ) do + local_path = Path.join(target.local_dir, relative_path) + remote_path = remote_file_spec(credentials, target.remote_dir, relative_path) + + case run_command(runner, "scp", ["-q", local_path, remote_path], ssh_auth_sock) do + :ok -> + upload_scp_files( + project_id, + target, + credentials, + runner, + ssh_auth_sock, + rest, + [{relative_path, local_mtime} | uploaded_files] + ) + + {:error, reason} -> + persist_uploaded_scp_files(project_id, credentials, target.kind, uploaded_files) + {:error, reason} + end + end + + defp collect_file_mtimes(local_dir, files) do + Enum.reduce_while(files, {:ok, []}, fn relative_path, {:ok, acc} -> + local_path = Path.join(local_dir, relative_path) + + case file_mtime(local_path) do + {:ok, local_mtime} -> {:cont, {:ok, [{relative_path, local_mtime} | acc]}} + {:error, reason} -> {:halt, {:error, reason}} + end + end) + |> case do + {:ok, files_with_mtimes} -> {:ok, Enum.reverse(files_with_mtimes)} + {:error, reason} -> {:error, reason} + end + end + + defp persist_uploaded_scp_files(_project_id, _credentials, _target_kind, []), do: :ok + + defp persist_uploaded_scp_files(project_id, credentials, target_kind, uploaded_files) do + record_uploaded_scp_files( + project_id, + credentials, + target_kind, + Enum.reverse(uploaded_files) + ) + end + + defp should_upload_mtime?(nil, _local_mtime), do: true + defp should_upload_mtime?(recorded_mtime, local_mtime), do: local_mtime > recorded_mtime + + defp repo_call_opts do + case Process.whereis(__MODULE__) do + pid when is_pid(pid) -> [caller: pid] + _other -> [] + end + end + defp scp_upload_key(project_id, credentials, target_kind, relative_path) do { project_id, diff --git a/test/bds/csm020_nested_case_test.exs b/test/bds/csm020_nested_case_test.exs index eb1d249..6bbb8f6 100644 --- a/test/bds/csm020_nested_case_test.exs +++ b/test/bds/csm020_nested_case_test.exs @@ -76,18 +76,18 @@ defmodule BDS.CSM020NestedCaseTest do end end - describe "Publishing.handle_call :update_job uses with" do + describe "Publishing.update_job/2 uses with" do test "source code uses with instead of case" do source = File.read!("lib/bds/publishing.ex") [func_source] = - Regex.scan(~r/def handle_call\(\{:update_job.*?(?=\n def |\n @impl)/s, source) + Regex.scan(~r/defp update_job\(job_id, attrs\).*?(?=\n defp |\nend)/s, source) assert func_source |> List.first() |> String.contains?("with"), - "update_job handler should use with" + "update_job should use with" refute func_source |> List.first() |> String.contains?("case Repo.get"), - "update_job handler should not use case Repo.get" + "update_job should not use case Repo.get" end end end diff --git a/test/bds/csm036_impl_true_test.exs b/test/bds/csm036_impl_true_test.exs index cc86ba0..5ff7d3e 100644 --- a/test/bds/csm036_impl_true_test.exs +++ b/test/bds/csm036_impl_true_test.exs @@ -14,7 +14,7 @@ defmodule BDS.CSM036ImplTrueTest do String.contains?(line, "def handle_call(") end) - assert length(handle_call_lines) >= 5, "expected at least 5 handle_call clauses" + assert length(handle_call_lines) >= 2, "expected at least 2 handle_call clauses" for {_line, idx} <- handle_call_lines do preceding = Enum.at(lines, idx - 2) diff --git a/test/bds/publishing_test.exs b/test/bds/publishing_test.exs index eba3e57..e3fc747 100644 --- a/test/bds/publishing_test.exs +++ b/test/bds/publishing_test.exs @@ -280,6 +280,46 @@ defmodule BDS.PublishingTest do assert elem(html_upload, 1) == ["-q", html_index, "deploy@example.com:/srv/blog/index.html"] end + test "upload_site batches scp mtime bookkeeping instead of calling the publishing server per file", + %{project: project, temp_dir: temp_dir} do + test_pid = self() + + File.mkdir_p!(Path.join([temp_dir, "html", "posts"])) + + for index <- 1..5 do + File.write!(Path.join([temp_dir, "html", "posts", "entry-#{index}.html"]), "") + end + + credentials = %{ + ssh_host: "example.com", + ssh_user: "deploy", + ssh_remote_path: "/srv/blog", + ssh_mode: :scp + } + + publishing_pid = Process.whereis(BDS.Publishing) + :erlang.trace(publishing_pid, true, [:receive]) + + runner = fn command, args, opts -> + send(test_pid, {:command_run, command, args, opts}) + {"", 0} + end + + assert {:ok, job} = + BDS.Publishing.upload_site(project.id, credentials, + command_runner: runner, + ssh_auth_sock: "/tmp/test-agent.sock" + ) + + assert wait_for_publish_job(job.id, &(&1.status == :completed)).status == :completed + + :erlang.trace(publishing_pid, false, [:receive]) + + bookkeeping_calls = collect_publishing_bookkeeping_calls(publishing_pid) + + assert length(bookkeeping_calls) <= 6 + end + test "publish jobs survive a publishing server restart because they are persisted", %{ project: project, temp_dir: temp_dir @@ -325,6 +365,25 @@ defmodule BDS.PublishingTest do end end + defp collect_publishing_bookkeeping_calls(publishing_pid, acc \\ []) do + receive do + {:trace, ^publishing_pid, :receive, {:"$gen_call", _from, message}} + when is_tuple(message) and tuple_size(message) > 0 and + elem(message, 0) in [ + :should_upload_scp_file, + :mark_uploaded_scp_file, + :filter_scp_uploads, + :record_uploaded_scp_files + ] -> + collect_publishing_bookkeeping_calls(publishing_pid, [message | acc]) + + {:trace, ^publishing_pid, :receive, _message} -> + collect_publishing_bookkeeping_calls(publishing_pid, acc) + after + 50 -> Enum.reverse(acc) + end + end + defp wait_for_publish_job(job_id, predicate, attempts \\ 100) defp wait_for_publish_job(job_id, predicate, attempts) when attempts > 0 do