Close TD-13 publishing GenServer call surface

This commit is contained in:
2026-06-12 13:13:00 +02:00
parent 4859c9708a
commit 741979fc39
5 changed files with 233 additions and 96 deletions

View File

@@ -518,7 +518,17 @@ debounced save. Queries during a rebuild keep hitting the old index.
**Acceptance.** Neighbor queries return while a duplicate scan is running
(test with a large synthetic index); debounce/flush semantics unchanged.
### TD-13: Slim down the `Publishing` GenServer call surface
### TD-13: Slim down the `Publishing` GenServer call surface ✅ DONE (2026-06-12)
**Status: implemented.** `BDS.Publishing` now keeps only SCP mtime state in its
GenServer. Publish-job creation, lookup, and status updates run directly through
`Repo`, while background-task job updates use a stable Repo caller so sandboxed
tests still exercise the real async path. SCP uploads no longer round-trip
through `should_upload_scp_file` / `mark_uploaded_scp_file` per file; each
target now batches one filter call for changed files and one bulk record call
for successfully uploaded mtimes. Coverage includes a focused batching test that
proves a multi-file SCP publish keeps bookkeeping traffic bounded instead of
scaling linearly with file count.
**Context.** `Publishing` does Repo writes inside `handle_call`
(`:upload_site`, `:update_job`) and the uploader makes per-file synchronous

View File

@@ -28,52 +28,7 @@ defmodule BDS.Publishing do
project = Projects.get_project!(project_id)
normalized_credentials = normalize_credentials(credentials)
targets = build_upload_targets(Projects.project_data_dir(project), normalized_credentials)
GenServer.call(__MODULE__, {:upload_site, project_id, normalized_credentials, targets, opts})
end
@spec get_job(String.t()) :: PublishJob.t() | nil
def get_job(job_id) when is_binary(job_id) do
GenServer.call(__MODULE__, {:get_job, job_id})
end
@impl true
def init(_state) do
{:ok, %{scp_uploads: %{}}}
end
@impl true
def handle_call({:get_job, job_id}, _from, state) do
{:reply, Repo.get(PublishJob, job_id), state}
end
@impl true
def handle_call({:update_job, job_id, attrs}, _from, state) do
with %PublishJob{} = job <- Repo.get(PublishJob, job_id) do
attrs = Map.put(attrs, :updated_at, Persistence.now_ms())
job |> PublishJob.changeset(attrs) |> Repo.update()
end
{:reply, :ok, state}
end
@impl true
def handle_call({:should_upload_scp_file, upload_key, local_mtime}, _from, state) do
should_upload? =
case state.scp_uploads[upload_key] do
nil -> true
recorded_mtime -> local_mtime > recorded_mtime
end
{:reply, should_upload?, state}
end
@impl true
def handle_call({:mark_uploaded_scp_file, upload_key, local_mtime}, _from, state) do
{:reply, :ok, put_in(state, [:scp_uploads, upload_key], local_mtime)}
end
@impl true
def handle_call({:upload_site, project_id, credentials, targets, opts}, _from, state) do
job_id = "publish-" <> Integer.to_string(System.unique_integer([:positive, :monotonic]))
uploader = build_uploader(Keyword.put_new(opts, :project_id, project_id))
now = Persistence.now_ms()
@@ -83,10 +38,10 @@ defmodule BDS.Publishing do
project_id: project_id,
status: :pending,
task_id: nil,
ssh_host: credentials.ssh_host,
ssh_user: credentials.ssh_user,
ssh_remote_path: credentials.ssh_remote_path,
ssh_mode: credentials.ssh_mode,
ssh_host: normalized_credentials.ssh_host,
ssh_user: normalized_credentials.ssh_user,
ssh_remote_path: normalized_credentials.ssh_remote_path,
ssh_mode: normalized_credentials.ssh_mode,
targets: Enum.map(targets, &to_string(&1.kind)),
error: nil,
inserted_at: now,
@@ -102,7 +57,7 @@ defmodule BDS.Publishing do
Tasks.submit_task(
"publish #{project_id}",
fn report ->
run_upload(job_id, credentials, targets, uploader, report)
run_upload(job_id, normalized_credentials, targets, uploader, report)
end,
%{
group_id: project_id,
@@ -115,7 +70,51 @@ defmodule BDS.Publishing do
|> PublishJob.changeset(%{task_id: task.id, updated_at: Persistence.now_ms()})
|> Repo.update!()
{:reply, {:ok, next_job}, state}
{:ok, next_job}
end
@spec get_job(String.t()) :: PublishJob.t() | nil
def get_job(job_id) when is_binary(job_id) do
Repo.get(PublishJob, job_id)
end
@impl true
def init(_state) do
{:ok, %{scp_uploads: %{}}}
end
@impl true
def handle_call({:filter_scp_uploads, project_id, credentials, target_kind, files}, _from, state) do
{files_to_upload, next_uploads} =
Enum.reduce(files, {[], state.scp_uploads}, fn {relative_path, local_mtime}, {acc, uploads} ->
upload_key = scp_upload_key(project_id, credentials, target_kind, relative_path)
if should_upload_mtime?(uploads[upload_key], local_mtime) do
{[{relative_path, local_mtime} | acc], uploads}
else
{acc, uploads}
end
end)
{:reply, Enum.reverse(files_to_upload), %{state | scp_uploads: next_uploads}}
end
@impl true
def handle_call(
{:record_uploaded_scp_files, project_id, credentials, target_kind, uploaded_files},
_from,
state
) do
next_uploads =
Enum.reduce(uploaded_files, state.scp_uploads, fn {relative_path, local_mtime}, uploads ->
Map.put(
uploads,
scp_upload_key(project_id, credentials, target_kind, relative_path),
local_mtime
)
end)
{:reply, :ok, %{state | scp_uploads: next_uploads}}
end
defp run_upload(job_id, credentials, targets, uploader, report) do
@@ -147,7 +146,14 @@ defmodule BDS.Publishing do
end
defp update_job(job_id, attrs) do
GenServer.call(__MODULE__, {:update_job, job_id, attrs})
repo_opts = repo_call_opts()
with %PublishJob{} = job <- Repo.get(PublishJob, job_id, repo_opts) do
attrs = Map.put(attrs, :updated_at, Persistence.now_ms())
_ = job |> PublishJob.changeset(attrs) |> Repo.update(repo_opts)
end
:ok
end
defp build_uploader(opts) do
@@ -188,41 +194,27 @@ defmodule BDS.Publishing do
end
defp run_command_upload(project_id, target, files, credentials, runner, ssh_auth_sock) do
Enum.reduce_while(files, :ok, fn relative_path, :ok ->
local_path = Path.join(target.local_dir, relative_path)
with {:ok, files_with_mtimes} <- collect_file_mtimes(target.local_dir, files) do
files_to_upload =
filter_scp_uploads(project_id, credentials, target.kind, files_with_mtimes)
with {:ok, local_mtime} <- file_mtime(local_path),
true <-
should_upload_scp_file?(
case upload_scp_files(
project_id,
target,
credentials,
target.kind,
relative_path,
local_mtime
runner,
ssh_auth_sock,
files_to_upload,
[]
) do
remote_path = remote_file_spec(credentials, target.remote_dir, relative_path)
case run_command(runner, "scp", ["-q", local_path, remote_path], ssh_auth_sock) do
:ok ->
:ok =
mark_uploaded_scp_file(
project_id,
credentials,
target.kind,
relative_path,
local_mtime
)
{:cont, :ok}
{:ok, uploaded_files} ->
persist_uploaded_scp_files(project_id, credentials, target.kind, uploaded_files)
:ok
{:error, reason} ->
{:halt, {:error, reason}}
{:error, reason}
end
else
false -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end
defp run_command(runner, command, args, ssh_auth_sock) do
@@ -254,22 +246,98 @@ defmodule BDS.Publishing do
end
end
defp should_upload_scp_file?(project_id, credentials, target_kind, relative_path, local_mtime) do
defp filter_scp_uploads(project_id, credentials, target_kind, files_with_mtimes) do
GenServer.call(
__MODULE__,
{:should_upload_scp_file,
scp_upload_key(project_id, credentials, target_kind, relative_path), local_mtime}
{:filter_scp_uploads, project_id, credentials, target_kind, files_with_mtimes}
)
end
defp mark_uploaded_scp_file(project_id, credentials, target_kind, relative_path, local_mtime) do
defp record_uploaded_scp_files(project_id, credentials, target_kind, uploaded_files) do
GenServer.call(
__MODULE__,
{:mark_uploaded_scp_file,
scp_upload_key(project_id, credentials, target_kind, relative_path), local_mtime}
{:record_uploaded_scp_files, project_id, credentials, target_kind, uploaded_files}
)
end
defp upload_scp_files(
_project_id,
_target,
_credentials,
_runner,
_ssh_auth_sock,
[],
uploaded_files
) do
{:ok, Enum.reverse(uploaded_files)}
end
defp upload_scp_files(
project_id,
target,
credentials,
runner,
ssh_auth_sock,
[{relative_path, local_mtime} | rest],
uploaded_files
) do
local_path = Path.join(target.local_dir, relative_path)
remote_path = remote_file_spec(credentials, target.remote_dir, relative_path)
case run_command(runner, "scp", ["-q", local_path, remote_path], ssh_auth_sock) do
:ok ->
upload_scp_files(
project_id,
target,
credentials,
runner,
ssh_auth_sock,
rest,
[{relative_path, local_mtime} | uploaded_files]
)
{:error, reason} ->
persist_uploaded_scp_files(project_id, credentials, target.kind, uploaded_files)
{:error, reason}
end
end
defp collect_file_mtimes(local_dir, files) do
Enum.reduce_while(files, {:ok, []}, fn relative_path, {:ok, acc} ->
local_path = Path.join(local_dir, relative_path)
case file_mtime(local_path) do
{:ok, local_mtime} -> {:cont, {:ok, [{relative_path, local_mtime} | acc]}}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
|> case do
{:ok, files_with_mtimes} -> {:ok, Enum.reverse(files_with_mtimes)}
{:error, reason} -> {:error, reason}
end
end
defp persist_uploaded_scp_files(_project_id, _credentials, _target_kind, []), do: :ok
defp persist_uploaded_scp_files(project_id, credentials, target_kind, uploaded_files) do
record_uploaded_scp_files(
project_id,
credentials,
target_kind,
Enum.reverse(uploaded_files)
)
end
defp should_upload_mtime?(nil, _local_mtime), do: true
defp should_upload_mtime?(recorded_mtime, local_mtime), do: local_mtime > recorded_mtime
defp repo_call_opts do
case Process.whereis(__MODULE__) do
pid when is_pid(pid) -> [caller: pid]
_other -> []
end
end
defp scp_upload_key(project_id, credentials, target_kind, relative_path) do
{
project_id,

View File

@@ -76,18 +76,18 @@ defmodule BDS.CSM020NestedCaseTest do
end
end
describe "Publishing.handle_call :update_job uses with" do
describe "Publishing.update_job/2 uses with" do
test "source code uses with instead of case" do
source = File.read!("lib/bds/publishing.ex")
[func_source] =
Regex.scan(~r/def handle_call\(\{:update_job.*?(?=\n def |\n @impl)/s, source)
Regex.scan(~r/defp update_job\(job_id, attrs\).*?(?=\n defp |\nend)/s, source)
assert func_source |> List.first() |> String.contains?("with"),
"update_job handler should use with"
"update_job should use with"
refute func_source |> List.first() |> String.contains?("case Repo.get"),
"update_job handler should not use case Repo.get"
"update_job should not use case Repo.get"
end
end
end

View File

@@ -14,7 +14,7 @@ defmodule BDS.CSM036ImplTrueTest do
String.contains?(line, "def handle_call(")
end)
assert length(handle_call_lines) >= 5, "expected at least 5 handle_call clauses"
assert length(handle_call_lines) >= 2, "expected at least 2 handle_call clauses"
for {_line, idx} <- handle_call_lines do
preceding = Enum.at(lines, idx - 2)

View File

@@ -280,6 +280,46 @@ defmodule BDS.PublishingTest do
assert elem(html_upload, 1) == ["-q", html_index, "deploy@example.com:/srv/blog/index.html"]
end
test "upload_site batches scp mtime bookkeeping instead of calling the publishing server per file",
%{project: project, temp_dir: temp_dir} do
test_pid = self()
File.mkdir_p!(Path.join([temp_dir, "html", "posts"]))
for index <- 1..5 do
File.write!(Path.join([temp_dir, "html", "posts", "entry-#{index}.html"]), "<html />")
end
credentials = %{
ssh_host: "example.com",
ssh_user: "deploy",
ssh_remote_path: "/srv/blog",
ssh_mode: :scp
}
publishing_pid = Process.whereis(BDS.Publishing)
:erlang.trace(publishing_pid, true, [:receive])
runner = fn command, args, opts ->
send(test_pid, {:command_run, command, args, opts})
{"", 0}
end
assert {:ok, job} =
BDS.Publishing.upload_site(project.id, credentials,
command_runner: runner,
ssh_auth_sock: "/tmp/test-agent.sock"
)
assert wait_for_publish_job(job.id, &(&1.status == :completed)).status == :completed
:erlang.trace(publishing_pid, false, [:receive])
bookkeeping_calls = collect_publishing_bookkeeping_calls(publishing_pid)
assert length(bookkeeping_calls) <= 6
end
test "publish jobs survive a publishing server restart because they are persisted", %{
project: project,
temp_dir: temp_dir
@@ -325,6 +365,25 @@ defmodule BDS.PublishingTest do
end
end
defp collect_publishing_bookkeeping_calls(publishing_pid, acc \\ []) do
receive do
{:trace, ^publishing_pid, :receive, {:"$gen_call", _from, message}}
when is_tuple(message) and tuple_size(message) > 0 and
elem(message, 0) in [
:should_upload_scp_file,
:mark_uploaded_scp_file,
:filter_scp_uploads,
:record_uploaded_scp_files
] ->
collect_publishing_bookkeeping_calls(publishing_pid, [message | acc])
{:trace, ^publishing_pid, :receive, _message} ->
collect_publishing_bookkeeping_calls(publishing_pid, acc)
after
50 -> Enum.reverse(acc)
end
end
defp wait_for_publish_job(job_id, predicate, attempts \\ 100)
defp wait_for_publish_job(job_id, predicate, attempts) when attempts > 0 do