From 4859c9708a15c455d7f26cf8ff340bafaf72f2c2 Mon Sep 17 00:00:00 2001 From: Chili Palmer Date: Fri, 12 Jun 2026 12:47:01 +0200 Subject: [PATCH] Close TD-12 non-blocking embeddings index work --- TECHDEBTS.md | 13 +- lib/bds/embeddings/index.ex | 249 ++++++++++++++++++++++++++++++++--- test/bds/embeddings_test.exs | 97 ++++++++++++++ 3 files changed, 338 insertions(+), 21 deletions(-) diff --git a/TECHDEBTS.md b/TECHDEBTS.md index 4388707..31371b2 100644 --- a/TECHDEBTS.md +++ b/TECHDEBTS.md @@ -485,7 +485,18 @@ renderers via monitors or a counter. **Acceptance.** A test issues N concurrent slow renders and asserts they overlap (wall time « N × single render); stop_preview still drains correctly. -### TD-12: Move HNSW builds and duplicate scans out of `Embeddings.Index` handle_call +### TD-12: Move HNSW builds and duplicate scans out of `Embeddings.Index` handle_call ✅ DONE (2026-06-12) + +**Status: implemented.** `BDS.Embeddings.Index` now runs duplicate scans and +HNSW rebuilds in supervised tasks instead of inside `handle_call`, while the +GenServer keeps only the small serialized state surface (current index, label +map, debounce timers, and flush coordination). Neighbor queries continue to hit +the current index while a scan or rebuild is in flight; rebuild requests for a +project coalesce onto the latest requested snapshot; `flush/1` and `flush_all/0` +wait for in-flight rebuilds before persisting; and `forget/1` cancels pending +index work cleanly. Acceptance proof now includes focused concurrency tests for +both slow duplicate scans and slow rebuilds, and the existing debounced +persistence coverage remains green. **Context.** `Embeddings.Index` (singleton) builds HNSW graphs and runs full duplicate scans inside `handle_call` with client timeout `:infinity`. A long diff --git a/lib/bds/embeddings/index.ex b/lib/bds/embeddings/index.ex index fab9d76..88b6b9d 100644 --- a/lib/bds/embeddings/index.ex +++ b/lib/bds/embeddings/index.ex @@ -30,6 +30,7 @@ defmodule BDS.Embeddings.Index do @m 16 @ef_construction 128 @ef_search 64 + @meta_key :"$meta" # ─── Public API ───────────────────────────────────────────── @@ -98,18 +99,17 @@ defmodule BDS.Embeddings.Index do @impl true def init(_opts) do Process.flag(:trap_exit, true) - {:ok, %{}} + {:ok, %{@meta_key => %{flush_all_waiters: []}}} end @impl true - def handle_call({:put, project_id, dimensions, entries}, _from, state) do + def handle_call({:put, project_id, dimensions, entries}, from, state) do # Cancel any pending debounce for this project first: build_entry/2 returns a # fresh entry with timer: nil, so without this the previous timer would be # orphaned (left to fire a redundant save) instead of coalescing. state = cancel_pending_save(state, project_id) - entry = build_entry(dimensions, entries) - state = state |> Map.put(project_id, entry) |> schedule_save(project_id) - {:reply, :ok, state} + state = start_build(state, project_id, dimensions, entries, from) + {:noreply, state} end def handle_call({:neighbors, project_id, query_label, query_vector, limit}, _from, state) do @@ -125,35 +125,43 @@ defmodule BDS.Embeddings.Index do end end - def handle_call({:duplicate_pairs, project_id, entries, threshold, opts}, _from, state) do + def handle_call({:duplicate_pairs, project_id, entries, threshold, opts}, from, state) do case ensure_loaded(state, project_id) do {:ok, %{index: nil}, state} -> {:reply, {:error, :missing}, state} {:ok, entry, state} -> - {:reply, {:ok, scan_duplicates(entry, entries, threshold, opts)}, state} + state = start_duplicate_scan(state, project_id, entry, entries, threshold, opts, from) + {:noreply, state} {:missing, state} -> {:reply, {:error, :missing}, state} end end - def handle_call({:flush, project_id}, _from, state) do - {:reply, :ok, save_now(state, project_id)} + def handle_call({:flush, project_id}, from, state) do + case Map.get(state, project_id) do + %{build: %{}} = entry -> + entry = update_in(entry.build.flush_waiters, &[from | &1]) + {:noreply, Map.put(state, project_id, entry)} + + _other -> + {:reply, :ok, save_now(state, project_id)} + end end - def handle_call(:flush_all, _from, state) do - state = Enum.reduce(Map.keys(state), state, &save_now(&2, &1)) - {:reply, :ok, state} + def handle_call(:flush_all, from, state) do + if builds_in_progress?(state) do + state = update_meta(state, fn meta -> %{meta | flush_all_waiters: [from | meta.flush_all_waiters]} end) + {:noreply, state} + else + state = flush_all_projects(state) + {:reply, :ok, state} + end end def handle_call({:forget, project_id}, _from, state) do - case Map.get(state, project_id) do - %{timer: timer} when is_reference(timer) -> Process.cancel_timer(timer) - _other -> :ok - end - - {:reply, :ok, Map.delete(state, project_id)} + {:reply, :ok, forget_project(state, project_id)} end @impl true @@ -161,11 +169,44 @@ defmodule BDS.Embeddings.Index do {:noreply, save_now(state, project_id)} end + def handle_info({ref, built_entry}, state) when is_reference(ref) do + case find_build_owner(state, ref) do + {:ok, project_id, entry} -> + Process.demonitor(ref, [:flush]) + {:noreply, complete_build(state, project_id, entry, built_entry)} + + :error -> + case find_scan_owner(state, ref) do + {:ok, project_id, entry, %{from: from}} -> + Process.demonitor(ref, [:flush]) + GenServer.reply(from, {:ok, built_entry}) + entry = %{entry | scans: Map.delete(entry.scans, ref)} + {:noreply, Map.put(state, project_id, entry)} + + :error -> + {:noreply, state} + end + end + end + + def handle_info({:DOWN, ref, :process, _pid, reason}, state) when is_reference(ref) do + case find_build_owner(state, ref) do + {:ok, _project_id, _entry} -> + exit({:index_build_failed, reason}) + + :error -> + case find_scan_owner(state, ref) do + {:ok, _project_id, _entry, _scan} -> exit({:duplicate_scan_failed, reason}) + :error -> {:noreply, state} + end + end + end + def handle_info(_message, state), do: {:noreply, state} @impl true def terminate(_reason, state) do - Enum.each(Map.keys(state), &save_now(state, &1)) + Enum.each(project_ids(state), &save_now(state, &1)) :ok end @@ -285,6 +326,9 @@ defmodule BDS.Embeddings.Index do nil -> state + %{build: %{}} = entry -> + Map.put(state, project_id, entry) + entry -> if is_reference(entry.timer), do: Process.cancel_timer(entry.timer) persist(project_id, entry) @@ -317,7 +361,10 @@ defmodule BDS.Embeddings.Index do case Map.get(state, project_id) do nil -> case load_from_disk(project_id) do - {:ok, entry} -> {:ok, entry, Map.put(state, project_id, entry)} + {:ok, entry} -> + entry = runtime_entry(entry) + {:ok, entry, Map.put(state, project_id, entry)} + :error -> {:missing, state} end @@ -356,6 +403,168 @@ defmodule BDS.Embeddings.Index do defp meta_path(index_path), do: index_path <> ".meta.json" + defp runtime_entry(entry) do + Map.merge(%{timer: nil, build: nil, scans: %{}}, entry) + end + + defp start_build(state, project_id, dimensions, entries, from) do + entry = + state + |> Map.get(project_id, runtime_entry(%{index: nil, labels: %{}, dim: dimensions, timer: nil})) + |> Map.put(:dim, dimensions) + + case entry.build do + nil -> + task = start_build_task(project_id, dimensions, entries) + build = %{ref: task.ref, pid: task.pid, callers: [from], flush_waiters: [], next_request: nil} + Map.put(state, project_id, %{entry | build: build}) + + build -> + build = %{build | callers: [from | build.callers], next_request: {dimensions, entries}} + Map.put(state, project_id, %{entry | build: build}) + end + end + + defp start_build_task(project_id, dimensions, entries) do + Task.Supervisor.async_nolink(BDS.Tasks.TaskSupervisor, fn -> + maybe_run_test_hook({:before_build, project_id, self()}) + build_entry(dimensions, entries) + end) + end + + defp complete_build(state, project_id, entry, built_entry) do + build = entry.build + + case build.next_request do + {next_dimensions, next_entries} -> + task = start_build_task(project_id, next_dimensions, next_entries) + + build = %{build | ref: task.ref, pid: task.pid, next_request: nil} + Map.put(state, project_id, %{entry | build: build}) + + nil -> + Enum.each(build.callers, &GenServer.reply(&1, :ok)) + + entry = %{runtime_entry(built_entry) | scans: entry.scans} + state = Map.put(state, project_id, entry) + + state = + if build.flush_waiters == [] do + schedule_save(state, project_id) + else + Enum.each(build.flush_waiters, &GenServer.reply(&1, :ok)) + save_now(state, project_id) + end + + maybe_finish_flush_all_waiters(state) + end + end + + defp start_duplicate_scan(state, project_id, entry, entries, threshold, opts, from) do + task = + Task.Supervisor.async_nolink(BDS.Tasks.TaskSupervisor, fn -> + scan_duplicates(entry, entries, threshold, opts) + end) + + scans = Map.put(entry.scans, task.ref, %{pid: task.pid, from: from}) + Map.put(state, project_id, %{entry | scans: scans}) + end + + defp forget_project(state, project_id) do + case Map.get(state, project_id) do + nil -> + maybe_finish_flush_all_waiters(state) + + entry -> + if is_reference(entry.timer), do: Process.cancel_timer(entry.timer) + + if build = entry.build do + _ = Task.Supervisor.terminate_child(BDS.Tasks.TaskSupervisor, build.pid) + Enum.each(build.callers, &GenServer.reply(&1, :ok)) + Enum.each(build.flush_waiters, &GenServer.reply(&1, :ok)) + end + + Enum.each(entry.scans, fn {_ref, %{pid: pid, from: from}} -> + _ = Task.Supervisor.terminate_child(BDS.Tasks.TaskSupervisor, pid) + GenServer.reply(from, {:error, :missing}) + end) + + state + |> Map.delete(project_id) + |> maybe_finish_flush_all_waiters() + end + end + + defp find_build_owner(state, ref) do + Enum.find_value(project_ids(state), :error, fn project_id -> + case Map.get(state, project_id) do + %{build: %{ref: ^ref}} = entry -> {:ok, project_id, entry} + _other -> false + end + end) + end + + defp find_scan_owner(state, ref) do + Enum.find_value(project_ids(state), :error, fn project_id -> + case Map.get(state, project_id) do + %{scans: scans} = entry -> + case Map.get(scans, ref) do + nil -> false + scan -> {:ok, project_id, entry, scan} + end + + _other -> + false + end + end) + end + + defp project_ids(state) do + state + |> Map.keys() + |> Enum.filter(&is_binary/1) + end + + defp builds_in_progress?(state) do + Enum.any?(project_ids(state), fn project_id -> + match?(%{build: %{}} , Map.get(state, project_id)) + end) + end + + defp flush_all_projects(state) do + Enum.reduce(project_ids(state), state, &save_now(&2, &1)) + end + + defp maybe_finish_flush_all_waiters(state) do + meta = meta(state) + + cond do + meta.flush_all_waiters == [] -> + state + + builds_in_progress?(state) -> + state + + true -> + state = flush_all_projects(state) + Enum.each(meta.flush_all_waiters, &GenServer.reply(&1, :ok)) + put_meta(state, %{meta | flush_all_waiters: []}) + end + end + + defp meta(state), do: Map.get(state, @meta_key, %{flush_all_waiters: []}) + + defp put_meta(state, meta), do: Map.put(state, @meta_key, meta) + + defp update_meta(state, fun), do: put_meta(state, fun.(meta(state))) + + defp maybe_run_test_hook(event) do + case Application.get_env(:bds, :embeddings_index_test_hook) do + callback when is_function(callback, 1) -> callback.(event) + _other -> :ok + end + end + defp sort_pair(post_id_a, post_id_b) when post_id_a <= post_id_b, do: {post_id_a, post_id_b} defp sort_pair(post_id_a, post_id_b), do: {post_id_b, post_id_a} diff --git a/test/bds/embeddings_test.exs b/test/bds/embeddings_test.exs index 9f555a9..ba7a4bd 100644 --- a/test/bds/embeddings_test.exs +++ b/test/bds/embeddings_test.exs @@ -691,4 +691,101 @@ defmodule BDS.EmbeddingsTest do assert is_nil(entry.timer) end end + + describe "non-blocking index work" do + alias BDS.Embeddings.Index + + defp index_entries(count) do + for seed <- 1..count do + %{label: seed, post_id: 100 + seed, vector: packed_vector(seed)} + end + end + + test "neighbors stay available while duplicate scanning is in flight", %{project: project} do + entries = index_entries(4) + :ok = Index.put(project.id, 384, entries) + + test_pid = self() + scan_ref = make_ref() + + duplicate_task = + Task.async(fn -> + Index.duplicate_pairs(project.id, entries, 0.0, + on_progress: fn progress, message -> + if Process.get({:duplicate_scan_blocked, scan_ref}) do + :ok + else + Process.put({:duplicate_scan_blocked, scan_ref}, true) + send(test_pid, {:duplicate_scan_progress, scan_ref, self(), progress, message}) + + receive do + {:release_duplicate_scan, ^scan_ref} -> :ok + end + end + end + ) + end) + + assert_receive {:duplicate_scan_progress, ^scan_ref, scan_worker_pid, _progress, _message}, 1_000 + + assert {:ok, neighbors} = Index.neighbors(project.id, 1, packed_vector(1), 1) + assert [%{post_id: 102}] = neighbors + assert Task.yield(duplicate_task, 0) == nil + + send(scan_worker_pid, {:release_duplicate_scan, scan_ref}) + assert {:ok, {:ok, pairs}} = Task.yield(duplicate_task, 1_000) + assert is_list(pairs) + end + + test "neighbors keep using the previous index while a rebuild is in flight", %{project: project} do + project_id = project.id + test_pid = self() + original_entries = index_entries(2) + replacement_entries = index_entries(3) + :ok = Index.put(project_id, 384, original_entries) + + original_hook = Application.get_env(:bds, :embeddings_index_test_hook) + + Application.put_env(:bds, :embeddings_index_test_hook, fn event -> + case event do + {:before_build, ^project_id, build_pid} -> + send(test_pid, {:index_build_blocked, project_id, build_pid}) + + receive do + {:release_index_build, ^project_id} -> :ok + end + + _other -> + :ok + end + end) + + on_exit(fn -> + if is_nil(original_hook) do + Application.delete_env(:bds, :embeddings_index_test_hook) + else + Application.put_env(:bds, :embeddings_index_test_hook, original_hook) + end + end) + + rebuild_task = + Task.async(fn -> + Index.put(project_id, 384, replacement_entries) + end) + + assert_receive {:index_build_blocked, ^project_id, build_pid}, 1_000 + + assert {:ok, neighbors} = Index.neighbors(project.id, 1, packed_vector(1), 1) + assert [%{post_id: 102}] = neighbors + assert Process.alive?(build_pid) + assert Task.yield(rebuild_task, 0) == nil + + send(build_pid, {:release_index_build, project_id}) + assert {:ok, :ok} = Task.yield(rebuild_task, 1_000) + + assert {:ok, neighbors_after_rebuild} = Index.neighbors(project.id, 1, packed_vector(1), 2) + assert Enum.any?(neighbors_after_rebuild, &(&1.post_id == 102)) + assert Enum.any?(neighbors_after_rebuild, &(&1.post_id == 103)) + end + end end