Close TD-12 non-blocking embeddings index work

This commit is contained in:
2026-06-12 12:47:01 +02:00
parent cd72998a13
commit 4859c9708a
3 changed files with 338 additions and 21 deletions

View File

@@ -485,7 +485,18 @@ renderers via monitors or a counter.
**Acceptance.** A test issues N concurrent slow renders and asserts they
overlap (wall time « N × single render); stop_preview still drains correctly.
### TD-12: Move HNSW builds and duplicate scans out of `Embeddings.Index` handle_call
### TD-12: Move HNSW builds and duplicate scans out of `Embeddings.Index` handle_call ✅ DONE (2026-06-12)
**Status: implemented.** `BDS.Embeddings.Index` now runs duplicate scans and
HNSW rebuilds in supervised tasks instead of inside `handle_call`, while the
GenServer keeps only the small serialized state surface (current index, label
map, debounce timers, and flush coordination). Neighbor queries continue to hit
the current index while a scan or rebuild is in flight; rebuild requests for a
project coalesce onto the latest requested snapshot; `flush/1` and `flush_all/0`
wait for in-flight rebuilds before persisting; and `forget/1` cancels pending
index work cleanly. Acceptance proof now includes focused concurrency tests for
both slow duplicate scans and slow rebuilds, and the existing debounced
persistence coverage remains green.
**Context.** `Embeddings.Index` (singleton) builds HNSW graphs and runs full
duplicate scans inside `handle_call` with client timeout `:infinity`. A long

View File

@@ -30,6 +30,7 @@ defmodule BDS.Embeddings.Index do
@m 16
@ef_construction 128
@ef_search 64
@meta_key :"$meta"
# ─── Public API ─────────────────────────────────────────────
@@ -98,18 +99,17 @@ defmodule BDS.Embeddings.Index do
@impl true
def init(_opts) do
Process.flag(:trap_exit, true)
{:ok, %{}}
{:ok, %{@meta_key => %{flush_all_waiters: []}}}
end
@impl true
def handle_call({:put, project_id, dimensions, entries}, _from, state) do
def handle_call({:put, project_id, dimensions, entries}, from, state) do
# Cancel any pending debounce for this project first: build_entry/2 returns a
# fresh entry with timer: nil, so without this the previous timer would be
# orphaned (left to fire a redundant save) instead of coalescing.
state = cancel_pending_save(state, project_id)
entry = build_entry(dimensions, entries)
state = state |> Map.put(project_id, entry) |> schedule_save(project_id)
{:reply, :ok, state}
state = start_build(state, project_id, dimensions, entries, from)
{:noreply, state}
end
def handle_call({:neighbors, project_id, query_label, query_vector, limit}, _from, state) do
@@ -125,35 +125,43 @@ defmodule BDS.Embeddings.Index do
end
end
def handle_call({:duplicate_pairs, project_id, entries, threshold, opts}, _from, state) do
def handle_call({:duplicate_pairs, project_id, entries, threshold, opts}, from, state) do
case ensure_loaded(state, project_id) do
{:ok, %{index: nil}, state} ->
{:reply, {:error, :missing}, state}
{:ok, entry, state} ->
{:reply, {:ok, scan_duplicates(entry, entries, threshold, opts)}, state}
state = start_duplicate_scan(state, project_id, entry, entries, threshold, opts, from)
{:noreply, state}
{:missing, state} ->
{:reply, {:error, :missing}, state}
end
end
def handle_call({:flush, project_id}, _from, state) do
{:reply, :ok, save_now(state, project_id)}
def handle_call({:flush, project_id}, from, state) do
case Map.get(state, project_id) do
%{build: %{}} = entry ->
entry = update_in(entry.build.flush_waiters, &[from | &1])
{:noreply, Map.put(state, project_id, entry)}
_other ->
{:reply, :ok, save_now(state, project_id)}
end
end
def handle_call(:flush_all, _from, state) do
state = Enum.reduce(Map.keys(state), state, &save_now(&2, &1))
{:reply, :ok, state}
def handle_call(:flush_all, from, state) do
if builds_in_progress?(state) do
state = update_meta(state, fn meta -> %{meta | flush_all_waiters: [from | meta.flush_all_waiters]} end)
{:noreply, state}
else
state = flush_all_projects(state)
{:reply, :ok, state}
end
end
def handle_call({:forget, project_id}, _from, state) do
case Map.get(state, project_id) do
%{timer: timer} when is_reference(timer) -> Process.cancel_timer(timer)
_other -> :ok
end
{:reply, :ok, Map.delete(state, project_id)}
{:reply, :ok, forget_project(state, project_id)}
end
@impl true
@@ -161,11 +169,44 @@ defmodule BDS.Embeddings.Index do
{:noreply, save_now(state, project_id)}
end
def handle_info({ref, built_entry}, state) when is_reference(ref) do
case find_build_owner(state, ref) do
{:ok, project_id, entry} ->
Process.demonitor(ref, [:flush])
{:noreply, complete_build(state, project_id, entry, built_entry)}
:error ->
case find_scan_owner(state, ref) do
{:ok, project_id, entry, %{from: from}} ->
Process.demonitor(ref, [:flush])
GenServer.reply(from, {:ok, built_entry})
entry = %{entry | scans: Map.delete(entry.scans, ref)}
{:noreply, Map.put(state, project_id, entry)}
:error ->
{:noreply, state}
end
end
end
def handle_info({:DOWN, ref, :process, _pid, reason}, state) when is_reference(ref) do
case find_build_owner(state, ref) do
{:ok, _project_id, _entry} ->
exit({:index_build_failed, reason})
:error ->
case find_scan_owner(state, ref) do
{:ok, _project_id, _entry, _scan} -> exit({:duplicate_scan_failed, reason})
:error -> {:noreply, state}
end
end
end
def handle_info(_message, state), do: {:noreply, state}
@impl true
def terminate(_reason, state) do
Enum.each(Map.keys(state), &save_now(state, &1))
Enum.each(project_ids(state), &save_now(state, &1))
:ok
end
@@ -285,6 +326,9 @@ defmodule BDS.Embeddings.Index do
nil ->
state
%{build: %{}} = entry ->
Map.put(state, project_id, entry)
entry ->
if is_reference(entry.timer), do: Process.cancel_timer(entry.timer)
persist(project_id, entry)
@@ -317,7 +361,10 @@ defmodule BDS.Embeddings.Index do
case Map.get(state, project_id) do
nil ->
case load_from_disk(project_id) do
{:ok, entry} -> {:ok, entry, Map.put(state, project_id, entry)}
{:ok, entry} ->
entry = runtime_entry(entry)
{:ok, entry, Map.put(state, project_id, entry)}
:error -> {:missing, state}
end
@@ -356,6 +403,168 @@ defmodule BDS.Embeddings.Index do
defp meta_path(index_path), do: index_path <> ".meta.json"
defp runtime_entry(entry) do
Map.merge(%{timer: nil, build: nil, scans: %{}}, entry)
end
defp start_build(state, project_id, dimensions, entries, from) do
entry =
state
|> Map.get(project_id, runtime_entry(%{index: nil, labels: %{}, dim: dimensions, timer: nil}))
|> Map.put(:dim, dimensions)
case entry.build do
nil ->
task = start_build_task(project_id, dimensions, entries)
build = %{ref: task.ref, pid: task.pid, callers: [from], flush_waiters: [], next_request: nil}
Map.put(state, project_id, %{entry | build: build})
build ->
build = %{build | callers: [from | build.callers], next_request: {dimensions, entries}}
Map.put(state, project_id, %{entry | build: build})
end
end
defp start_build_task(project_id, dimensions, entries) do
Task.Supervisor.async_nolink(BDS.Tasks.TaskSupervisor, fn ->
maybe_run_test_hook({:before_build, project_id, self()})
build_entry(dimensions, entries)
end)
end
defp complete_build(state, project_id, entry, built_entry) do
build = entry.build
case build.next_request do
{next_dimensions, next_entries} ->
task = start_build_task(project_id, next_dimensions, next_entries)
build = %{build | ref: task.ref, pid: task.pid, next_request: nil}
Map.put(state, project_id, %{entry | build: build})
nil ->
Enum.each(build.callers, &GenServer.reply(&1, :ok))
entry = %{runtime_entry(built_entry) | scans: entry.scans}
state = Map.put(state, project_id, entry)
state =
if build.flush_waiters == [] do
schedule_save(state, project_id)
else
Enum.each(build.flush_waiters, &GenServer.reply(&1, :ok))
save_now(state, project_id)
end
maybe_finish_flush_all_waiters(state)
end
end
defp start_duplicate_scan(state, project_id, entry, entries, threshold, opts, from) do
task =
Task.Supervisor.async_nolink(BDS.Tasks.TaskSupervisor, fn ->
scan_duplicates(entry, entries, threshold, opts)
end)
scans = Map.put(entry.scans, task.ref, %{pid: task.pid, from: from})
Map.put(state, project_id, %{entry | scans: scans})
end
defp forget_project(state, project_id) do
case Map.get(state, project_id) do
nil ->
maybe_finish_flush_all_waiters(state)
entry ->
if is_reference(entry.timer), do: Process.cancel_timer(entry.timer)
if build = entry.build do
_ = Task.Supervisor.terminate_child(BDS.Tasks.TaskSupervisor, build.pid)
Enum.each(build.callers, &GenServer.reply(&1, :ok))
Enum.each(build.flush_waiters, &GenServer.reply(&1, :ok))
end
Enum.each(entry.scans, fn {_ref, %{pid: pid, from: from}} ->
_ = Task.Supervisor.terminate_child(BDS.Tasks.TaskSupervisor, pid)
GenServer.reply(from, {:error, :missing})
end)
state
|> Map.delete(project_id)
|> maybe_finish_flush_all_waiters()
end
end
defp find_build_owner(state, ref) do
Enum.find_value(project_ids(state), :error, fn project_id ->
case Map.get(state, project_id) do
%{build: %{ref: ^ref}} = entry -> {:ok, project_id, entry}
_other -> false
end
end)
end
defp find_scan_owner(state, ref) do
Enum.find_value(project_ids(state), :error, fn project_id ->
case Map.get(state, project_id) do
%{scans: scans} = entry ->
case Map.get(scans, ref) do
nil -> false
scan -> {:ok, project_id, entry, scan}
end
_other ->
false
end
end)
end
defp project_ids(state) do
state
|> Map.keys()
|> Enum.filter(&is_binary/1)
end
defp builds_in_progress?(state) do
Enum.any?(project_ids(state), fn project_id ->
match?(%{build: %{}} , Map.get(state, project_id))
end)
end
defp flush_all_projects(state) do
Enum.reduce(project_ids(state), state, &save_now(&2, &1))
end
defp maybe_finish_flush_all_waiters(state) do
meta = meta(state)
cond do
meta.flush_all_waiters == [] ->
state
builds_in_progress?(state) ->
state
true ->
state = flush_all_projects(state)
Enum.each(meta.flush_all_waiters, &GenServer.reply(&1, :ok))
put_meta(state, %{meta | flush_all_waiters: []})
end
end
defp meta(state), do: Map.get(state, @meta_key, %{flush_all_waiters: []})
defp put_meta(state, meta), do: Map.put(state, @meta_key, meta)
defp update_meta(state, fun), do: put_meta(state, fun.(meta(state)))
defp maybe_run_test_hook(event) do
case Application.get_env(:bds, :embeddings_index_test_hook) do
callback when is_function(callback, 1) -> callback.(event)
_other -> :ok
end
end
defp sort_pair(post_id_a, post_id_b) when post_id_a <= post_id_b, do: {post_id_a, post_id_b}
defp sort_pair(post_id_a, post_id_b), do: {post_id_b, post_id_a}

View File

@@ -691,4 +691,101 @@ defmodule BDS.EmbeddingsTest do
assert is_nil(entry.timer)
end
end
describe "non-blocking index work" do
alias BDS.Embeddings.Index
defp index_entries(count) do
for seed <- 1..count do
%{label: seed, post_id: 100 + seed, vector: packed_vector(seed)}
end
end
test "neighbors stay available while duplicate scanning is in flight", %{project: project} do
entries = index_entries(4)
:ok = Index.put(project.id, 384, entries)
test_pid = self()
scan_ref = make_ref()
duplicate_task =
Task.async(fn ->
Index.duplicate_pairs(project.id, entries, 0.0,
on_progress: fn progress, message ->
if Process.get({:duplicate_scan_blocked, scan_ref}) do
:ok
else
Process.put({:duplicate_scan_blocked, scan_ref}, true)
send(test_pid, {:duplicate_scan_progress, scan_ref, self(), progress, message})
receive do
{:release_duplicate_scan, ^scan_ref} -> :ok
end
end
end
)
end)
assert_receive {:duplicate_scan_progress, ^scan_ref, scan_worker_pid, _progress, _message}, 1_000
assert {:ok, neighbors} = Index.neighbors(project.id, 1, packed_vector(1), 1)
assert [%{post_id: 102}] = neighbors
assert Task.yield(duplicate_task, 0) == nil
send(scan_worker_pid, {:release_duplicate_scan, scan_ref})
assert {:ok, {:ok, pairs}} = Task.yield(duplicate_task, 1_000)
assert is_list(pairs)
end
test "neighbors keep using the previous index while a rebuild is in flight", %{project: project} do
project_id = project.id
test_pid = self()
original_entries = index_entries(2)
replacement_entries = index_entries(3)
:ok = Index.put(project_id, 384, original_entries)
original_hook = Application.get_env(:bds, :embeddings_index_test_hook)
Application.put_env(:bds, :embeddings_index_test_hook, fn event ->
case event do
{:before_build, ^project_id, build_pid} ->
send(test_pid, {:index_build_blocked, project_id, build_pid})
receive do
{:release_index_build, ^project_id} -> :ok
end
_other ->
:ok
end
end)
on_exit(fn ->
if is_nil(original_hook) do
Application.delete_env(:bds, :embeddings_index_test_hook)
else
Application.put_env(:bds, :embeddings_index_test_hook, original_hook)
end
end)
rebuild_task =
Task.async(fn ->
Index.put(project_id, 384, replacement_entries)
end)
assert_receive {:index_build_blocked, ^project_id, build_pid}, 1_000
assert {:ok, neighbors} = Index.neighbors(project.id, 1, packed_vector(1), 1)
assert [%{post_id: 102}] = neighbors
assert Process.alive?(build_pid)
assert Task.yield(rebuild_task, 0) == nil
send(build_pid, {:release_index_build, project_id})
assert {:ok, :ok} = Task.yield(rebuild_task, 1_000)
assert {:ok, neighbors_after_rebuild} = Index.neighbors(project.id, 1, packed_vector(1), 2)
assert Enum.any?(neighbors_after_rebuild, &(&1.post_id == 102))
assert Enum.any?(neighbors_after_rebuild, &(&1.post_id == 103))
end
end
end