From e29dfb490afbc7ae2d8ee58690e618bc60e329ee Mon Sep 17 00:00:00 2001 From: Chili Palmer Date: Wed, 27 May 2026 19:03:21 +0200 Subject: [PATCH] fix(perf): replace Enum.each + individual inserts with preloaded keys and batch upsert in embeddings (CSM-033) --- CODESMELL.md | 18 ++- lib/bds/embeddings.ex | 123 ++++++++++++--- test/bds/csm033_batch_inserts_test.exs | 206 +++++++++++++++++++++++++ 3 files changed, 322 insertions(+), 25 deletions(-) create mode 100644 test/bds/csm033_batch_inserts_test.exs diff --git a/CODESMELL.md b/CODESMELL.md index 92a053c..a022c90 100644 --- a/CODESMELL.md +++ b/CODESMELL.md @@ -500,10 +500,20 @@ --- -### CSM-033 — `Enum.each` with Side Effects That Should Be Batch Inserts -- **Files:** `lib/bds/search.ex:174-177`, `lib/bds/embeddings.ex` -- **What:** `Enum.each` used for inserting records. The side-effect pattern is fine, but `Enum.map` + `Repo.insert_all` would be much faster for bulk inserts. -- **Fix:** Use `Repo.insert_all` for batch inserts instead of `Enum.each` + `Repo.insert`. +### ~~CSM-033 — `Enum.each` with Side Effects That Should Be Batch Inserts~~ ✅ FIXED +- **Fixed:** 2026-05-27 +- **What was done:** + - **`lib/bds/search.ex`** — Already addressed by CSM-006; `batch_insert_post_index` and `batch_insert_media_index` use multi-row SQL INSERT with chunking. + - **`lib/bds/embeddings.ex`** — Replaced `Enum.each` + per-post `sync_post_if_enabled` (which did N individual `Repo.get_by` reads + N individual `Repo.insert_or_update` writes) in three bulk functions: + - `rebuild_project/2` — preloads all keys via `preload_keys_by_post_id/1`, computes rows with `compute_key_data/3`, batch-upserts with `batch_upsert_keys/1`. + - `repair_posts/2` — same pattern with `preload_keys_by_post_id/2` scoped to target post IDs. + - `index_unindexed/1` — same pattern, eliminating per-post `Repo.get_by` lookups. + - Added `preload_keys_by_post_id/1` and `/2` — single-query key preload into a map by post_id. + - Added `max_label_value/0` — reads max label once instead of per-post `next_label()` queries. + - Added `compute_key_data/3` — resolves body, hashes, embeds (if needed), returns `:skip` or `{:upsert, row}`. + - Added `batch_upsert_keys/1` — multi-row `INSERT INTO embedding_keys ... ON CONFLICT(label) DO UPDATE` with 199-row chunking (SQLite 999-param limit ÷ 5 columns). + - `sync_post_if_enabled/2` retained for single-post `sync_post/1` path (CRUD operations). + - Added 11 tests in `test/bds/csm033_batch_inserts_test.exs`: source-level assertions (no Enum.each+sync_post_if_enabled, batch_upsert_keys present, preload present, ON CONFLICT upsert, compute_key_data used), search.ex batch verification, and functional tests (index 5 posts, rebuild updates stale keys, repair targets subset, skip on matching hash). --- diff --git a/lib/bds/embeddings.ex b/lib/bds/embeddings.ex index 0759c6a..a4ecd95 100644 --- a/lib/bds/embeddings.ex +++ b/lib/bds/embeddings.ex @@ -15,6 +15,7 @@ defmodule BDS.Embeddings do @duplicate_threshold 0.92 @exact_match_score 0.999999 + @key_batch_size 199 def model_id, do: configured_backend().model_info().model_id def dimensions, do: configured_backend().model_info().dimensions @@ -73,7 +74,24 @@ defmodule BDS.Embeddings do order_by: [asc: post.created_at, asc: post.slug] ) - Enum.each(posts, &sync_post_if_enabled(&1, refresh_index: false)) + existing_keys = preload_keys_by_post_id(project_id, Enum.map(posts, & &1.id)) + base_label = max_label_value() + + {rows, _next_label} = + Enum.reduce(posts, {[], base_label + 1}, fn post, {acc, next_label} -> + existing_key = Map.get(existing_keys, post.id) + + case compute_key_data(post, existing_key, next_label) do + :skip -> + {acc, next_label} + + {:upsert, row} -> + bump = if existing_key, do: 0, else: 1 + {[row | acc], next_label + bump} + end + end) + + batch_upsert_keys(rows) :ok = rebuild_snapshot(project_id) {:ok, Enum.map(posts, & &1.id)} else @@ -104,12 +122,27 @@ defmodule BDS.Embeddings do where: key.project_id == ^project_id and key.post_id not in ^post_ids ) - posts - |> Enum.with_index(1) - |> Enum.each(fn {post, index} -> - sync_post_if_enabled(post, refresh_index: false) - :ok = report_rebuild_progress(on_progress, index, total_posts, "embedding entries") - end) + existing_keys = preload_keys_by_post_id(project_id) + base_label = max_label_value() + + {rows, _next_label} = + posts + |> Enum.with_index(1) + |> Enum.reduce({[], base_label + 1}, fn {post, index}, {acc, next_label} -> + :ok = report_rebuild_progress(on_progress, index, total_posts, "embedding entries") + existing_key = Map.get(existing_keys, post.id) + + case compute_key_data(post, existing_key, next_label) do + :skip -> + {acc, next_label} + + {:upsert, row} -> + bump = if existing_key, do: 0, else: 1 + {[row | acc], next_label + bump} + end + end) + + batch_upsert_keys(rows) :ok = report_rebuild_phase(on_progress, 0.99, "Persisting embedding snapshot") :ok = rebuild_snapshot(project_id) @@ -196,6 +229,53 @@ defmodule BDS.Embeddings do end end + defp preload_keys_by_post_id(project_id) do + Repo.all(from key in Key, where: key.project_id == ^project_id) + |> Map.new(&{&1.post_id, &1}) + end + + defp preload_keys_by_post_id(project_id, post_ids) do + Repo.all( + from key in Key, + where: key.project_id == ^project_id and key.post_id in ^post_ids + ) + |> Map.new(&{&1.post_id, &1}) + end + + defp max_label_value do + Repo.one(from key in Key, select: max(key.label)) || 0 + end + + defp compute_key_data(%Post{} = post, existing_key, next_label) do + body = resolve_post_body(post) + raw_text = compose_embedding_source(post.title, body) + content_hash = hash_text(raw_text) + + if existing_key && existing_key.content_hash == content_hash do + :skip + else + {:ok, vector} = embed_text(raw_text, post.language) + label = if existing_key, do: existing_key.label, else: next_label + {:upsert, [label, post.id, post.project_id, content_hash, Jason.encode!(vector)]} + end + end + + defp batch_upsert_keys([]), do: :ok + + defp batch_upsert_keys(rows) do + rows + |> Enum.chunk_every(@key_batch_size) + |> Enum.each(fn chunk -> + placeholders = Enum.map_join(chunk, ", ", fn _ -> "(?, ?, ?, ?, ?)" end) + params = List.flatten(chunk) + + Repo.query!( + "INSERT INTO embedding_keys (label, post_id, project_id, content_hash, vector) VALUES #{placeholders} ON CONFLICT(label) DO UPDATE SET content_hash = excluded.content_hash, vector = excluded.vector", + params + ) + end) + end + def remove_post(post_id) when is_binary(post_id) do project_id = case Repo.get_by(Key, post_id: post_id) do @@ -227,23 +307,24 @@ defmodule BDS.Embeddings do order_by: [asc: post.created_at, asc: post.slug] ) - Enum.each(posts, fn post -> - body = resolve_post_body(post) - content_hash = hash_text(compose_embedding_source(post.title, body)) + existing_keys = preload_keys_by_post_id(project_id) + base_label = max_label_value() - case Repo.get_by(Key, post_id: post.id, project_id: project_id) do - %Key{content_hash: ^content_hash} -> - :ok + {rows, _next_label} = + Enum.reduce(posts, {[], base_label + 1}, fn post, {acc, next_label} -> + existing_key = Map.get(existing_keys, post.id) - _other -> - :ok = - sync_post_if_enabled( - %{post | content: if(post.content in [nil, ""], do: body, else: post.content)}, - refresh_index: false - ) - end - end) + case compute_key_data(post, existing_key, next_label) do + :skip -> + {acc, next_label} + {:upsert, row} -> + bump = if existing_key, do: 0, else: 1 + {[row | acc], next_label + bump} + end + end) + + batch_upsert_keys(rows) :ok = rebuild_snapshot(project_id) indexed = diff --git a/test/bds/csm033_batch_inserts_test.exs b/test/bds/csm033_batch_inserts_test.exs new file mode 100644 index 0000000..a819dd4 --- /dev/null +++ b/test/bds/csm033_batch_inserts_test.exs @@ -0,0 +1,206 @@ +defmodule BDS.CSM033BatchInsertsTest do + use ExUnit.Case, async: false + import Ecto.Query + + describe "source-level: embeddings.ex uses batch inserts instead of Enum.each + individual writes" do + setup do + source = File.read!("lib/bds/embeddings.ex") + %{source: source} + end + + test "no Enum.each calling sync_post_if_enabled in bulk paths", %{source: source} do + refute source =~ "Enum.each(posts, &sync_post_if_enabled", + "bulk paths should not use Enum.each with sync_post_if_enabled" + + refute source =~ ~r/Enum\.each\(fn \{post, index\} ->\n\s+sync_post_if_enabled/, + "bulk paths should not use Enum.each with sync_post_if_enabled" + end + + test "bulk functions use batch_upsert_keys", %{source: source} do + assert source =~ "batch_upsert_keys(rows)", + "expected batch_upsert_keys to be called with collected rows" + end + + test "bulk functions preload keys before the loop", %{source: source} do + assert source =~ "preload_keys_by_post_id(project_id)", + "expected keys to be preloaded in a single query" + end + + test "batch_upsert_keys uses multi-row INSERT with ON CONFLICT upsert", %{source: source} do + assert source =~ "INSERT INTO embedding_keys", + "expected raw SQL batch INSERT for embedding keys" + + assert source =~ "ON CONFLICT(label) DO UPDATE", + "expected ON CONFLICT upsert clause" + end + + test "compute_key_data is used instead of individual Repo.insert_or_update", %{source: source} do + assert source =~ "compute_key_data(post, existing_key, next_label)", + "expected compute_key_data helper for row computation" + end + end + + describe "source-level: search.ex already uses batch inserts" do + test "batch_insert_post_index uses multi-row VALUES" do + source = File.read!("lib/bds/search.ex") + assert source =~ "batch_insert_post_index" + assert source =~ ~r/INSERT INTO posts_fts.*VALUES.*\#\{placeholders\}/s + end + + test "batch_insert_media_index uses multi-row VALUES" do + source = File.read!("lib/bds/search.ex") + assert source =~ "batch_insert_media_index" + assert source =~ ~r/INSERT INTO media_fts.*VALUES.*\#\{placeholders\}/s + end + end + + describe "functional: batch operations produce correct results" do + defmodule FakeBackend do + @behaviour BDS.Embeddings.Backend + + @impl true + def model_info, do: %{model_id: "fake/test-model", dimensions: 384} + + @impl true + def embed(text, opts), do: BDS.Embeddings.Backends.InApp.embed(text, opts) + end + + setup do + :ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo) + + temp_dir = + Path.join(System.tmp_dir!(), "bds-csm033-#{System.unique_integer([:positive])}") + + File.mkdir_p!(temp_dir) + on_exit(fn -> File.rm_rf(temp_dir) end) + + {:ok, project} = BDS.Projects.create_project(%{name: "CSM033", data_path: temp_dir}) + + previous_config = Application.get_env(:bds, :embeddings) + Application.put_env(:bds, :embeddings, backend: FakeBackend) + + on_exit(fn -> + if previous_config == nil do + Application.delete_env(:bds, :embeddings) + else + Application.put_env(:bds, :embeddings, previous_config) + end + end) + + assert {:ok, _metadata} = + BDS.Metadata.update_project_metadata(project.id, %{ + semantic_similarity_enabled: true + }) + + %{project: project} + end + + test "index_unindexed batch-inserts keys for multiple posts", %{project: project} do + posts = + for i <- 1..5 do + {:ok, post} = + BDS.Posts.create_post(%{ + project_id: project.id, + title: "Post #{i}", + content: "content for post number #{i} with unique words #{:rand.uniform(10000)}", + language: "en" + }) + + post + end + + {:ok, indexed} = BDS.Embeddings.index_unindexed(project.id) + assert length(indexed) == 5 + assert Enum.all?(posts, fn post -> post.id in indexed end) + + keys = + BDS.Repo.all( + from(k in BDS.Embeddings.Key, where: k.project_id == ^project.id) + ) + + assert length(keys) == 5 + labels = Enum.map(keys, & &1.label) |> Enum.sort() + assert labels == Enum.to_list(1..5) + end + + test "rebuild_project updates stale keys via batch upsert", %{project: project} do + {:ok, post} = + BDS.Posts.create_post(%{ + project_id: project.id, + title: "Rebuild Target", + content: "original content for rebuild test", + language: "en" + }) + + {:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id) + + original_key = + BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id) + + {:ok, _post} = BDS.Posts.update_post(post.id, %{content: "completely different content now"}) + + {:ok, rebuilt_ids} = BDS.Embeddings.rebuild_project(project.id) + assert post.id in rebuilt_ids + + updated_key = + BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id) + + assert updated_key.label == original_key.label + assert updated_key.content_hash != original_key.content_hash + end + + test "repair_posts batch-upserts for specified posts only", %{project: project} do + {:ok, post_a} = + BDS.Posts.create_post(%{ + project_id: project.id, + title: "Repair A", + content: "content A", + language: "en" + }) + + {:ok, _post_b} = + BDS.Posts.create_post(%{ + project_id: project.id, + title: "Repair B", + content: "content B", + language: "en" + }) + + {:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id) + {:ok, repaired} = BDS.Embeddings.repair_posts(project.id, [post_a.id]) + assert repaired == [post_a.id] + + keys = + BDS.Repo.all( + from(k in BDS.Embeddings.Key, where: k.project_id == ^project.id) + ) + + assert length(keys) == 2 + end + + test "index_unindexed skips posts with matching content hash", %{project: project} do + {:ok, post} = + BDS.Posts.create_post(%{ + project_id: project.id, + title: "Skip Test", + content: "unchanged content for skip test", + language: "en" + }) + + {:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id) + + key_before = + BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id) + + {:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id) + + key_after = + BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id) + + assert key_before.label == key_after.label + assert key_before.content_hash == key_after.content_hash + assert key_before.vector == key_after.vector + end + end + +end