fix(perf): replace Enum.each + individual inserts with preloaded keys and batch upsert in embeddings (CSM-033)
This commit is contained in:
18
CODESMELL.md
18
CODESMELL.md
@@ -500,10 +500,20 @@
|
||||
|
||||
---
|
||||
|
||||
### CSM-033 — `Enum.each` with Side Effects That Should Be Batch Inserts
|
||||
- **Files:** `lib/bds/search.ex:174-177`, `lib/bds/embeddings.ex`
|
||||
- **What:** `Enum.each` used for inserting records. The side-effect pattern is fine, but `Enum.map` + `Repo.insert_all` would be much faster for bulk inserts.
|
||||
- **Fix:** Use `Repo.insert_all` for batch inserts instead of `Enum.each` + `Repo.insert`.
|
||||
### ~~CSM-033 — `Enum.each` with Side Effects That Should Be Batch Inserts~~ ✅ FIXED
|
||||
- **Fixed:** 2026-05-27
|
||||
- **What was done:**
|
||||
- **`lib/bds/search.ex`** — Already addressed by CSM-006; `batch_insert_post_index` and `batch_insert_media_index` use multi-row SQL INSERT with chunking.
|
||||
- **`lib/bds/embeddings.ex`** — Replaced `Enum.each` + per-post `sync_post_if_enabled` (which did N individual `Repo.get_by` reads + N individual `Repo.insert_or_update` writes) in three bulk functions:
|
||||
- `rebuild_project/2` — preloads all keys via `preload_keys_by_post_id/1`, computes rows with `compute_key_data/3`, batch-upserts with `batch_upsert_keys/1`.
|
||||
- `repair_posts/2` — same pattern with `preload_keys_by_post_id/2` scoped to target post IDs.
|
||||
- `index_unindexed/1` — same pattern, eliminating per-post `Repo.get_by` lookups.
|
||||
- Added `preload_keys_by_post_id/1` and `/2` — single-query key preload into a map by post_id.
|
||||
- Added `max_label_value/0` — reads max label once instead of per-post `next_label()` queries.
|
||||
- Added `compute_key_data/3` — resolves body, hashes, embeds (if needed), returns `:skip` or `{:upsert, row}`.
|
||||
- Added `batch_upsert_keys/1` — multi-row `INSERT INTO embedding_keys ... ON CONFLICT(label) DO UPDATE` with 199-row chunking (SQLite 999-param limit ÷ 5 columns).
|
||||
- `sync_post_if_enabled/2` retained for single-post `sync_post/1` path (CRUD operations).
|
||||
- Added 11 tests in `test/bds/csm033_batch_inserts_test.exs`: source-level assertions (no Enum.each+sync_post_if_enabled, batch_upsert_keys present, preload present, ON CONFLICT upsert, compute_key_data used), search.ex batch verification, and functional tests (index 5 posts, rebuild updates stale keys, repair targets subset, skip on matching hash).
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ defmodule BDS.Embeddings do
|
||||
|
||||
@duplicate_threshold 0.92
|
||||
@exact_match_score 0.999999
|
||||
@key_batch_size 199
|
||||
|
||||
def model_id, do: configured_backend().model_info().model_id
|
||||
def dimensions, do: configured_backend().model_info().dimensions
|
||||
@@ -73,7 +74,24 @@ defmodule BDS.Embeddings do
|
||||
order_by: [asc: post.created_at, asc: post.slug]
|
||||
)
|
||||
|
||||
Enum.each(posts, &sync_post_if_enabled(&1, refresh_index: false))
|
||||
existing_keys = preload_keys_by_post_id(project_id, Enum.map(posts, & &1.id))
|
||||
base_label = max_label_value()
|
||||
|
||||
{rows, _next_label} =
|
||||
Enum.reduce(posts, {[], base_label + 1}, fn post, {acc, next_label} ->
|
||||
existing_key = Map.get(existing_keys, post.id)
|
||||
|
||||
case compute_key_data(post, existing_key, next_label) do
|
||||
:skip ->
|
||||
{acc, next_label}
|
||||
|
||||
{:upsert, row} ->
|
||||
bump = if existing_key, do: 0, else: 1
|
||||
{[row | acc], next_label + bump}
|
||||
end
|
||||
end)
|
||||
|
||||
batch_upsert_keys(rows)
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
{:ok, Enum.map(posts, & &1.id)}
|
||||
else
|
||||
@@ -104,13 +122,28 @@ defmodule BDS.Embeddings do
|
||||
where: key.project_id == ^project_id and key.post_id not in ^post_ids
|
||||
)
|
||||
|
||||
existing_keys = preload_keys_by_post_id(project_id)
|
||||
base_label = max_label_value()
|
||||
|
||||
{rows, _next_label} =
|
||||
posts
|
||||
|> Enum.with_index(1)
|
||||
|> Enum.each(fn {post, index} ->
|
||||
sync_post_if_enabled(post, refresh_index: false)
|
||||
|> Enum.reduce({[], base_label + 1}, fn {post, index}, {acc, next_label} ->
|
||||
:ok = report_rebuild_progress(on_progress, index, total_posts, "embedding entries")
|
||||
existing_key = Map.get(existing_keys, post.id)
|
||||
|
||||
case compute_key_data(post, existing_key, next_label) do
|
||||
:skip ->
|
||||
{acc, next_label}
|
||||
|
||||
{:upsert, row} ->
|
||||
bump = if existing_key, do: 0, else: 1
|
||||
{[row | acc], next_label + bump}
|
||||
end
|
||||
end)
|
||||
|
||||
batch_upsert_keys(rows)
|
||||
|
||||
:ok = report_rebuild_phase(on_progress, 0.99, "Persisting embedding snapshot")
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
{:ok, post_ids}
|
||||
@@ -196,6 +229,53 @@ defmodule BDS.Embeddings do
|
||||
end
|
||||
end
|
||||
|
||||
defp preload_keys_by_post_id(project_id) do
|
||||
Repo.all(from key in Key, where: key.project_id == ^project_id)
|
||||
|> Map.new(&{&1.post_id, &1})
|
||||
end
|
||||
|
||||
defp preload_keys_by_post_id(project_id, post_ids) do
|
||||
Repo.all(
|
||||
from key in Key,
|
||||
where: key.project_id == ^project_id and key.post_id in ^post_ids
|
||||
)
|
||||
|> Map.new(&{&1.post_id, &1})
|
||||
end
|
||||
|
||||
defp max_label_value do
|
||||
Repo.one(from key in Key, select: max(key.label)) || 0
|
||||
end
|
||||
|
||||
defp compute_key_data(%Post{} = post, existing_key, next_label) do
|
||||
body = resolve_post_body(post)
|
||||
raw_text = compose_embedding_source(post.title, body)
|
||||
content_hash = hash_text(raw_text)
|
||||
|
||||
if existing_key && existing_key.content_hash == content_hash do
|
||||
:skip
|
||||
else
|
||||
{:ok, vector} = embed_text(raw_text, post.language)
|
||||
label = if existing_key, do: existing_key.label, else: next_label
|
||||
{:upsert, [label, post.id, post.project_id, content_hash, Jason.encode!(vector)]}
|
||||
end
|
||||
end
|
||||
|
||||
defp batch_upsert_keys([]), do: :ok
|
||||
|
||||
defp batch_upsert_keys(rows) do
|
||||
rows
|
||||
|> Enum.chunk_every(@key_batch_size)
|
||||
|> Enum.each(fn chunk ->
|
||||
placeholders = Enum.map_join(chunk, ", ", fn _ -> "(?, ?, ?, ?, ?)" end)
|
||||
params = List.flatten(chunk)
|
||||
|
||||
Repo.query!(
|
||||
"INSERT INTO embedding_keys (label, post_id, project_id, content_hash, vector) VALUES #{placeholders} ON CONFLICT(label) DO UPDATE SET content_hash = excluded.content_hash, vector = excluded.vector",
|
||||
params
|
||||
)
|
||||
end)
|
||||
end
|
||||
|
||||
def remove_post(post_id) when is_binary(post_id) do
|
||||
project_id =
|
||||
case Repo.get_by(Key, post_id: post_id) do
|
||||
@@ -227,23 +307,24 @@ defmodule BDS.Embeddings do
|
||||
order_by: [asc: post.created_at, asc: post.slug]
|
||||
)
|
||||
|
||||
Enum.each(posts, fn post ->
|
||||
body = resolve_post_body(post)
|
||||
content_hash = hash_text(compose_embedding_source(post.title, body))
|
||||
existing_keys = preload_keys_by_post_id(project_id)
|
||||
base_label = max_label_value()
|
||||
|
||||
case Repo.get_by(Key, post_id: post.id, project_id: project_id) do
|
||||
%Key{content_hash: ^content_hash} ->
|
||||
:ok
|
||||
{rows, _next_label} =
|
||||
Enum.reduce(posts, {[], base_label + 1}, fn post, {acc, next_label} ->
|
||||
existing_key = Map.get(existing_keys, post.id)
|
||||
|
||||
_other ->
|
||||
:ok =
|
||||
sync_post_if_enabled(
|
||||
%{post | content: if(post.content in [nil, ""], do: body, else: post.content)},
|
||||
refresh_index: false
|
||||
)
|
||||
case compute_key_data(post, existing_key, next_label) do
|
||||
:skip ->
|
||||
{acc, next_label}
|
||||
|
||||
{:upsert, row} ->
|
||||
bump = if existing_key, do: 0, else: 1
|
||||
{[row | acc], next_label + bump}
|
||||
end
|
||||
end)
|
||||
|
||||
batch_upsert_keys(rows)
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
|
||||
indexed =
|
||||
|
||||
206
test/bds/csm033_batch_inserts_test.exs
Normal file
206
test/bds/csm033_batch_inserts_test.exs
Normal file
@@ -0,0 +1,206 @@
|
||||
defmodule BDS.CSM033BatchInsertsTest do
|
||||
use ExUnit.Case, async: false
|
||||
import Ecto.Query
|
||||
|
||||
describe "source-level: embeddings.ex uses batch inserts instead of Enum.each + individual writes" do
|
||||
setup do
|
||||
source = File.read!("lib/bds/embeddings.ex")
|
||||
%{source: source}
|
||||
end
|
||||
|
||||
test "no Enum.each calling sync_post_if_enabled in bulk paths", %{source: source} do
|
||||
refute source =~ "Enum.each(posts, &sync_post_if_enabled",
|
||||
"bulk paths should not use Enum.each with sync_post_if_enabled"
|
||||
|
||||
refute source =~ ~r/Enum\.each\(fn \{post, index\} ->\n\s+sync_post_if_enabled/,
|
||||
"bulk paths should not use Enum.each with sync_post_if_enabled"
|
||||
end
|
||||
|
||||
test "bulk functions use batch_upsert_keys", %{source: source} do
|
||||
assert source =~ "batch_upsert_keys(rows)",
|
||||
"expected batch_upsert_keys to be called with collected rows"
|
||||
end
|
||||
|
||||
test "bulk functions preload keys before the loop", %{source: source} do
|
||||
assert source =~ "preload_keys_by_post_id(project_id)",
|
||||
"expected keys to be preloaded in a single query"
|
||||
end
|
||||
|
||||
test "batch_upsert_keys uses multi-row INSERT with ON CONFLICT upsert", %{source: source} do
|
||||
assert source =~ "INSERT INTO embedding_keys",
|
||||
"expected raw SQL batch INSERT for embedding keys"
|
||||
|
||||
assert source =~ "ON CONFLICT(label) DO UPDATE",
|
||||
"expected ON CONFLICT upsert clause"
|
||||
end
|
||||
|
||||
test "compute_key_data is used instead of individual Repo.insert_or_update", %{source: source} do
|
||||
assert source =~ "compute_key_data(post, existing_key, next_label)",
|
||||
"expected compute_key_data helper for row computation"
|
||||
end
|
||||
end
|
||||
|
||||
describe "source-level: search.ex already uses batch inserts" do
|
||||
test "batch_insert_post_index uses multi-row VALUES" do
|
||||
source = File.read!("lib/bds/search.ex")
|
||||
assert source =~ "batch_insert_post_index"
|
||||
assert source =~ ~r/INSERT INTO posts_fts.*VALUES.*\#\{placeholders\}/s
|
||||
end
|
||||
|
||||
test "batch_insert_media_index uses multi-row VALUES" do
|
||||
source = File.read!("lib/bds/search.ex")
|
||||
assert source =~ "batch_insert_media_index"
|
||||
assert source =~ ~r/INSERT INTO media_fts.*VALUES.*\#\{placeholders\}/s
|
||||
end
|
||||
end
|
||||
|
||||
describe "functional: batch operations produce correct results" do
|
||||
defmodule FakeBackend do
|
||||
@behaviour BDS.Embeddings.Backend
|
||||
|
||||
@impl true
|
||||
def model_info, do: %{model_id: "fake/test-model", dimensions: 384}
|
||||
|
||||
@impl true
|
||||
def embed(text, opts), do: BDS.Embeddings.Backends.InApp.embed(text, opts)
|
||||
end
|
||||
|
||||
setup do
|
||||
:ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo)
|
||||
|
||||
temp_dir =
|
||||
Path.join(System.tmp_dir!(), "bds-csm033-#{System.unique_integer([:positive])}")
|
||||
|
||||
File.mkdir_p!(temp_dir)
|
||||
on_exit(fn -> File.rm_rf(temp_dir) end)
|
||||
|
||||
{:ok, project} = BDS.Projects.create_project(%{name: "CSM033", data_path: temp_dir})
|
||||
|
||||
previous_config = Application.get_env(:bds, :embeddings)
|
||||
Application.put_env(:bds, :embeddings, backend: FakeBackend)
|
||||
|
||||
on_exit(fn ->
|
||||
if previous_config == nil do
|
||||
Application.delete_env(:bds, :embeddings)
|
||||
else
|
||||
Application.put_env(:bds, :embeddings, previous_config)
|
||||
end
|
||||
end)
|
||||
|
||||
assert {:ok, _metadata} =
|
||||
BDS.Metadata.update_project_metadata(project.id, %{
|
||||
semantic_similarity_enabled: true
|
||||
})
|
||||
|
||||
%{project: project}
|
||||
end
|
||||
|
||||
test "index_unindexed batch-inserts keys for multiple posts", %{project: project} do
|
||||
posts =
|
||||
for i <- 1..5 do
|
||||
{:ok, post} =
|
||||
BDS.Posts.create_post(%{
|
||||
project_id: project.id,
|
||||
title: "Post #{i}",
|
||||
content: "content for post number #{i} with unique words #{:rand.uniform(10000)}",
|
||||
language: "en"
|
||||
})
|
||||
|
||||
post
|
||||
end
|
||||
|
||||
{:ok, indexed} = BDS.Embeddings.index_unindexed(project.id)
|
||||
assert length(indexed) == 5
|
||||
assert Enum.all?(posts, fn post -> post.id in indexed end)
|
||||
|
||||
keys =
|
||||
BDS.Repo.all(
|
||||
from(k in BDS.Embeddings.Key, where: k.project_id == ^project.id)
|
||||
)
|
||||
|
||||
assert length(keys) == 5
|
||||
labels = Enum.map(keys, & &1.label) |> Enum.sort()
|
||||
assert labels == Enum.to_list(1..5)
|
||||
end
|
||||
|
||||
test "rebuild_project updates stale keys via batch upsert", %{project: project} do
|
||||
{:ok, post} =
|
||||
BDS.Posts.create_post(%{
|
||||
project_id: project.id,
|
||||
title: "Rebuild Target",
|
||||
content: "original content for rebuild test",
|
||||
language: "en"
|
||||
})
|
||||
|
||||
{:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id)
|
||||
|
||||
original_key =
|
||||
BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id)
|
||||
|
||||
{:ok, _post} = BDS.Posts.update_post(post.id, %{content: "completely different content now"})
|
||||
|
||||
{:ok, rebuilt_ids} = BDS.Embeddings.rebuild_project(project.id)
|
||||
assert post.id in rebuilt_ids
|
||||
|
||||
updated_key =
|
||||
BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id)
|
||||
|
||||
assert updated_key.label == original_key.label
|
||||
assert updated_key.content_hash != original_key.content_hash
|
||||
end
|
||||
|
||||
test "repair_posts batch-upserts for specified posts only", %{project: project} do
|
||||
{:ok, post_a} =
|
||||
BDS.Posts.create_post(%{
|
||||
project_id: project.id,
|
||||
title: "Repair A",
|
||||
content: "content A",
|
||||
language: "en"
|
||||
})
|
||||
|
||||
{:ok, _post_b} =
|
||||
BDS.Posts.create_post(%{
|
||||
project_id: project.id,
|
||||
title: "Repair B",
|
||||
content: "content B",
|
||||
language: "en"
|
||||
})
|
||||
|
||||
{:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id)
|
||||
{:ok, repaired} = BDS.Embeddings.repair_posts(project.id, [post_a.id])
|
||||
assert repaired == [post_a.id]
|
||||
|
||||
keys =
|
||||
BDS.Repo.all(
|
||||
from(k in BDS.Embeddings.Key, where: k.project_id == ^project.id)
|
||||
)
|
||||
|
||||
assert length(keys) == 2
|
||||
end
|
||||
|
||||
test "index_unindexed skips posts with matching content hash", %{project: project} do
|
||||
{:ok, post} =
|
||||
BDS.Posts.create_post(%{
|
||||
project_id: project.id,
|
||||
title: "Skip Test",
|
||||
content: "unchanged content for skip test",
|
||||
language: "en"
|
||||
})
|
||||
|
||||
{:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id)
|
||||
|
||||
key_before =
|
||||
BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id)
|
||||
|
||||
{:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id)
|
||||
|
||||
key_after =
|
||||
BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id)
|
||||
|
||||
assert key_before.label == key_after.label
|
||||
assert key_before.content_hash == key_after.content_hash
|
||||
assert key_before.vector == key_after.vector
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
Reference in New Issue
Block a user