fix: force full re-embed on explicit rebuild and degrade gracefully when embedding model is unavailable
This commit is contained in:
@@ -2,6 +2,7 @@ defmodule BDS.Embeddings do
|
||||
@moduledoc false
|
||||
|
||||
import Ecto.Query
|
||||
require Logger
|
||||
|
||||
alias BDS.Persistence
|
||||
alias BDS.Embeddings.DismissedDuplicatePair
|
||||
@@ -75,11 +76,16 @@ defmodule BDS.Embeddings do
|
||||
)
|
||||
|
||||
existing_keys = preload_keys_by_post_id(project_id, Enum.map(posts, & &1.id))
|
||||
rows = build_key_rows(posts, existing_keys, max_label_value(), nil)
|
||||
|
||||
batch_upsert_keys(rows)
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
{:ok, Enum.map(posts, & &1.id)}
|
||||
case build_key_rows(posts, existing_keys, max_label_value(), nil, false) do
|
||||
{:ok, rows} ->
|
||||
batch_upsert_keys(rows)
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
{:ok, Enum.map(posts, & &1.id)}
|
||||
|
||||
{:error, _reason} = error ->
|
||||
error
|
||||
end
|
||||
else
|
||||
{:ok, []}
|
||||
end
|
||||
@@ -106,13 +112,19 @@ defmodule BDS.Embeddings do
|
||||
)
|
||||
|
||||
existing_keys = preload_keys_by_post_id(project_id)
|
||||
rows = build_key_rows(posts, existing_keys, max_label_value(), on_progress)
|
||||
|
||||
batch_upsert_keys(rows)
|
||||
# An explicit rebuild re-embeds every post from scratch (ReindexAll),
|
||||
# ignoring the content_hash skip optimisation.
|
||||
case build_key_rows(posts, existing_keys, max_label_value(), on_progress, true) do
|
||||
{:ok, rows} ->
|
||||
batch_upsert_keys(rows)
|
||||
:ok = report_rebuild_phase(on_progress, 0.99, "Persisting embedding snapshot")
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
{:ok, post_ids}
|
||||
|
||||
:ok = report_rebuild_phase(on_progress, 0.99, "Persisting embedding snapshot")
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
{:ok, post_ids}
|
||||
{:error, _reason} = error ->
|
||||
error
|
||||
end
|
||||
else
|
||||
{:ok, []}
|
||||
end
|
||||
@@ -172,24 +184,36 @@ defmodule BDS.Embeddings do
|
||||
:ok
|
||||
|
||||
existing_key ->
|
||||
label = existing_key_label(existing_key) || next_label()
|
||||
{:ok, vector} = embed_text(raw_text, post.language)
|
||||
case embed_text(raw_text, post.language) do
|
||||
{:ok, vector} ->
|
||||
label = existing_key_label(existing_key) || next_label()
|
||||
|
||||
(existing_key || %Key{})
|
||||
|> Key.changeset(%{
|
||||
label: label,
|
||||
post_id: post.id,
|
||||
project_id: post.project_id,
|
||||
content_hash: content_hash,
|
||||
vector: encode_vector(vector)
|
||||
})
|
||||
|> Repo.insert_or_update()
|
||||
(existing_key || %Key{})
|
||||
|> Key.changeset(%{
|
||||
label: label,
|
||||
post_id: post.id,
|
||||
project_id: post.project_id,
|
||||
content_hash: content_hash,
|
||||
vector: encode_vector(vector)
|
||||
})
|
||||
|> Repo.insert_or_update()
|
||||
|
||||
if Keyword.get(opts, :refresh_index, true) do
|
||||
:ok = rebuild_snapshot(post.project_id)
|
||||
if Keyword.get(opts, :refresh_index, true) do
|
||||
:ok = rebuild_snapshot(post.project_id)
|
||||
end
|
||||
|
||||
:ok
|
||||
|
||||
{:error, reason} ->
|
||||
# Embedding is best-effort on post save: if the model is unavailable
|
||||
# (e.g. offline first-use download), leave the post unindexed rather
|
||||
# than failing the save. An explicit reindex surfaces the error.
|
||||
Logger.warning(
|
||||
"Embedding unavailable for post #{post.id}: #{inspect(reason)}; left unindexed"
|
||||
)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
@@ -210,11 +234,12 @@ defmodule BDS.Embeddings do
|
||||
Repo.one(from key in Key, select: max(key.label)) || 0
|
||||
end
|
||||
|
||||
# Builds the upsert rows for a batch of posts. Posts whose content_hash is
|
||||
# unchanged are skipped (ContentHashSkipsUnchanged); the rest are embedded in
|
||||
# batches (see embed_pending/2) so model inference is not serialised one post
|
||||
# at a time. Labels keep their existing value or take the next free integer.
|
||||
defp build_key_rows(posts, existing_keys, base_label, on_progress) do
|
||||
# Builds the upsert rows for a batch of posts. Unless `force?` is set, posts
|
||||
# whose content_hash is unchanged are skipped (ContentHashSkipsUnchanged); the
|
||||
# rest are embedded in batches (see embed_pending/2) so model inference is not
|
||||
# serialised one post at a time. Labels keep their existing value or take the
|
||||
# next free integer. Returns `{:error, reason}` if the model is unavailable.
|
||||
defp build_key_rows(posts, existing_keys, base_label, on_progress, force?) do
|
||||
prepared =
|
||||
Enum.map(posts, fn post ->
|
||||
raw_text = compose_embedding_source(post.title, resolve_post_body(post))
|
||||
@@ -226,14 +251,20 @@ defmodule BDS.Embeddings do
|
||||
existing: existing,
|
||||
raw_text: raw_text,
|
||||
content_hash: content_hash,
|
||||
needs_embed?: is_nil(existing) or existing.content_hash != content_hash
|
||||
needs_embed?: force? or is_nil(existing) or existing.content_hash != content_hash
|
||||
}
|
||||
end)
|
||||
|
||||
pending = Enum.filter(prepared, & &1.needs_embed?)
|
||||
:ok = report_rebuild_started(on_progress, length(pending), "embedding entries")
|
||||
vectors_by_post_id = embed_pending(pending, on_progress)
|
||||
|
||||
case embed_pending(pending, on_progress) do
|
||||
{:ok, vectors_by_post_id} -> {:ok, collect_rows(prepared, vectors_by_post_id, base_label)}
|
||||
{:error, _reason} = error -> error
|
||||
end
|
||||
end
|
||||
|
||||
defp collect_rows(prepared, vectors_by_post_id, base_label) do
|
||||
{rows, _next_label} =
|
||||
Enum.reduce(prepared, {[], base_label + 1}, fn entry, {acc, next_label} ->
|
||||
if entry.needs_embed? do
|
||||
@@ -258,7 +289,7 @@ defmodule BDS.Embeddings do
|
||||
rows
|
||||
end
|
||||
|
||||
defp embed_pending([], _on_progress), do: %{}
|
||||
defp embed_pending([], _on_progress), do: {:ok, %{}}
|
||||
|
||||
defp embed_pending(pending, on_progress) do
|
||||
total = length(pending)
|
||||
@@ -268,25 +299,36 @@ defmodule BDS.Embeddings do
|
||||
# Group by language so the lexical stub stems consistently; the neural
|
||||
# backend is multilingual and ignores the language hint.
|
||||
|> Enum.group_by(& &1.post.language)
|
||||
|> Enum.reduce({%{}, 0}, fn {language, group}, acc ->
|
||||
|> Enum.reduce_while({%{}, 0}, fn {language, group}, acc ->
|
||||
group
|
||||
|> Enum.chunk_every(batch)
|
||||
|> Enum.reduce(acc, fn chunk, {vectors, done} ->
|
||||
{:ok, chunk_vectors} = embed_many(Enum.map(chunk, & &1.raw_text), language)
|
||||
|> Enum.reduce_while(acc, fn chunk, {vectors, done} ->
|
||||
case embed_many(Enum.map(chunk, & &1.raw_text), language) do
|
||||
{:ok, chunk_vectors} ->
|
||||
vectors =
|
||||
chunk
|
||||
|> Enum.zip(chunk_vectors)
|
||||
|> Enum.reduce(vectors, fn {entry, vector}, acc ->
|
||||
Map.put(acc, entry.post.id, vector)
|
||||
end)
|
||||
|
||||
vectors =
|
||||
chunk
|
||||
|> Enum.zip(chunk_vectors)
|
||||
|> Enum.reduce(vectors, fn {entry, vector}, acc ->
|
||||
Map.put(acc, entry.post.id, vector)
|
||||
end)
|
||||
done = done + length(chunk)
|
||||
:ok = report_rebuild_progress(on_progress, done, total, "embedding entries")
|
||||
{:cont, {vectors, done}}
|
||||
|
||||
done = done + length(chunk)
|
||||
:ok = report_rebuild_progress(on_progress, done, total, "embedding entries")
|
||||
{vectors, done}
|
||||
{:error, reason} ->
|
||||
{:halt, {:error, reason}}
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
{:error, reason} -> {:halt, {:error, reason}}
|
||||
accumulator -> {:cont, accumulator}
|
||||
end
|
||||
end)
|
||||
|> elem(0)
|
||||
|> case do
|
||||
{:error, reason} -> {:error, reason}
|
||||
{vectors, _done} -> {:ok, vectors}
|
||||
end
|
||||
end
|
||||
|
||||
defp batch_upsert_keys([]), do: :ok
|
||||
@@ -337,15 +379,20 @@ defmodule BDS.Embeddings do
|
||||
)
|
||||
|
||||
existing_keys = preload_keys_by_post_id(project_id)
|
||||
rows = build_key_rows(posts, existing_keys, max_label_value(), nil)
|
||||
|
||||
batch_upsert_keys(rows)
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
case build_key_rows(posts, existing_keys, max_label_value(), nil, false) do
|
||||
{:ok, rows} ->
|
||||
batch_upsert_keys(rows)
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
|
||||
indexed =
|
||||
Repo.all(from key in Key, where: key.project_id == ^project_id, select: key.post_id)
|
||||
indexed =
|
||||
Repo.all(from key in Key, where: key.project_id == ^project_id, select: key.post_id)
|
||||
|
||||
{:ok, indexed}
|
||||
{:ok, indexed}
|
||||
|
||||
{:error, _reason} = error ->
|
||||
error
|
||||
end
|
||||
else
|
||||
{:ok, []}
|
||||
end
|
||||
@@ -677,13 +724,16 @@ defmodule BDS.Embeddings do
|
||||
if function_exported?(backend, :embed_many, 2) do
|
||||
backend.embed_many(texts, language: language)
|
||||
else
|
||||
vectors =
|
||||
Enum.map(texts, fn text ->
|
||||
{:ok, vector} = backend.embed(text, language: language)
|
||||
vector
|
||||
end)
|
||||
|
||||
{:ok, vectors}
|
||||
Enum.reduce_while(texts, {:ok, []}, fn text, {:ok, acc} ->
|
||||
case backend.embed(text, language: language) do
|
||||
{:ok, vector} -> {:cont, {:ok, [vector | acc]}}
|
||||
{:error, _reason} = error -> {:halt, error}
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
{:ok, vectors} -> {:ok, Enum.reverse(vectors)}
|
||||
{:error, _reason} = error -> error
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
Reference in New Issue
Block a user