758 lines
22 KiB
Elixir
758 lines
22 KiB
Elixir
defmodule BDS.Embeddings do
|
|
@moduledoc false
|
|
|
|
import Ecto.Query
|
|
|
|
alias BDS.Persistence
|
|
alias BDS.Embeddings.DismissedDuplicatePair
|
|
alias BDS.Embeddings.Index
|
|
alias BDS.Embeddings.Key
|
|
alias BDS.Metadata
|
|
alias BDS.Posts.Post
|
|
alias BDS.ProgressReporter
|
|
alias BDS.Projects
|
|
alias BDS.Repo
|
|
|
|
@duplicate_threshold 0.92
|
|
@exact_match_score 0.999999
|
|
@key_batch_size 199
|
|
|
|
def model_id, do: configured_backend().model_info().model_id
|
|
def dimensions, do: configured_backend().model_info().dimensions
|
|
def index_path(project_id), do: Index.path(project_id)
|
|
def reindex_all(project_id), do: rebuild_project(project_id)
|
|
|
|
def refresh_snapshot(project_id) when is_binary(project_id) do
|
|
if enabled_for_project?(project_id) do
|
|
:ok = rebuild_snapshot(project_id)
|
|
end
|
|
|
|
:ok
|
|
end
|
|
|
|
def get_indexing_progress(project_id) when is_binary(project_id) do
|
|
indexed =
|
|
Repo.one(
|
|
from key in Key,
|
|
where: key.project_id == ^project_id,
|
|
select: count(key.post_id, :distinct)
|
|
) || 0
|
|
|
|
total =
|
|
Repo.one(
|
|
from post in Post,
|
|
where: post.project_id == ^project_id,
|
|
select: count(post.id)
|
|
) || 0
|
|
|
|
{:ok, %{indexed: indexed, total: total}}
|
|
end
|
|
|
|
def sync_post(%Post{} = post) do
|
|
if enabled_for_project?(post.project_id) do
|
|
sync_post_if_enabled(post, refresh_index: true)
|
|
else
|
|
:ok
|
|
end
|
|
end
|
|
|
|
def sync_post(post_id) when is_binary(post_id) do
|
|
case Repo.get(Post, post_id) do
|
|
nil -> :ok
|
|
post -> sync_post(post)
|
|
end
|
|
end
|
|
|
|
def repair_posts(project_id, post_ids) when is_binary(project_id) and is_list(post_ids) do
|
|
if enabled_for_project?(project_id) do
|
|
post_ids = Enum.uniq(post_ids)
|
|
|
|
posts =
|
|
Repo.all(
|
|
from post in Post,
|
|
where: post.project_id == ^project_id and post.id in ^post_ids,
|
|
order_by: [asc: post.created_at, asc: post.slug]
|
|
)
|
|
|
|
existing_keys = preload_keys_by_post_id(project_id, Enum.map(posts, & &1.id))
|
|
base_label = max_label_value()
|
|
|
|
{rows, _next_label} =
|
|
Enum.reduce(posts, {[], base_label + 1}, fn post, {acc, next_label} ->
|
|
existing_key = Map.get(existing_keys, post.id)
|
|
|
|
case compute_key_data(post, existing_key, next_label) do
|
|
:skip ->
|
|
{acc, next_label}
|
|
|
|
{:upsert, row} ->
|
|
bump = if existing_key, do: 0, else: 1
|
|
{[row | acc], next_label + bump}
|
|
end
|
|
end)
|
|
|
|
batch_upsert_keys(rows)
|
|
:ok = rebuild_snapshot(project_id)
|
|
{:ok, Enum.map(posts, & &1.id)}
|
|
else
|
|
{:ok, []}
|
|
end
|
|
end
|
|
|
|
def rebuild_project(project_id, opts \\ [])
|
|
|
|
def rebuild_project(project_id, opts) when is_binary(project_id) and is_list(opts) do
|
|
if enabled_for_project?(project_id) do
|
|
on_progress = progress_callback(opts)
|
|
|
|
posts =
|
|
Repo.all(
|
|
from post in Post,
|
|
where: post.project_id == ^project_id,
|
|
order_by: [asc: post.created_at, asc: post.slug]
|
|
)
|
|
|
|
post_ids = Enum.map(posts, & &1.id)
|
|
total_posts = length(posts)
|
|
|
|
:ok = report_rebuild_started(on_progress, total_posts, "embedding entries")
|
|
|
|
Repo.delete_all(
|
|
from key in Key,
|
|
where: key.project_id == ^project_id and key.post_id not in ^post_ids
|
|
)
|
|
|
|
existing_keys = preload_keys_by_post_id(project_id)
|
|
base_label = max_label_value()
|
|
|
|
{rows, _next_label} =
|
|
posts
|
|
|> Enum.with_index(1)
|
|
|> Enum.reduce({[], base_label + 1}, fn {post, index}, {acc, next_label} ->
|
|
:ok = report_rebuild_progress(on_progress, index, total_posts, "embedding entries")
|
|
existing_key = Map.get(existing_keys, post.id)
|
|
|
|
case compute_key_data(post, existing_key, next_label) do
|
|
:skip ->
|
|
{acc, next_label}
|
|
|
|
{:upsert, row} ->
|
|
bump = if existing_key, do: 0, else: 1
|
|
{[row | acc], next_label + bump}
|
|
end
|
|
end)
|
|
|
|
batch_upsert_keys(rows)
|
|
|
|
:ok = report_rebuild_phase(on_progress, 0.99, "Persisting embedding snapshot")
|
|
:ok = rebuild_snapshot(project_id)
|
|
{:ok, post_ids}
|
|
else
|
|
{:ok, []}
|
|
end
|
|
end
|
|
|
|
def diff_reports(project_id) when is_binary(project_id) do
|
|
if enabled_for_project?(project_id) do
|
|
keys_by_post =
|
|
Repo.all(from key in Key, where: key.project_id == ^project_id)
|
|
|> Map.new(&{&1.post_id, &1})
|
|
|
|
Repo.all(from post in Post, where: post.project_id == ^project_id)
|
|
|> Enum.flat_map(fn post ->
|
|
expected_hash = post_content_hash(post)
|
|
key = Map.get(keys_by_post, post.id)
|
|
|
|
differences =
|
|
[
|
|
diff_field("content_hash", key && key.content_hash, expected_hash),
|
|
diff_field(
|
|
"embedding",
|
|
current_embedding_status(key, expected_hash),
|
|
expected_embedding_status(key, expected_hash)
|
|
)
|
|
]
|
|
|> Enum.reject(&is_nil/1)
|
|
|
|
if differences == [] do
|
|
[]
|
|
else
|
|
[
|
|
%{
|
|
entity_type: "embedding",
|
|
entity_id: post.id,
|
|
label: post.title || post.slug || post.id,
|
|
meta_label: Persistence.timestamp_to_iso8601(post.created_at),
|
|
differences: differences
|
|
}
|
|
]
|
|
end
|
|
end)
|
|
else
|
|
[]
|
|
end
|
|
end
|
|
|
|
defp sync_post_if_enabled(%Post{} = post, opts) do
|
|
body = resolve_post_body(post)
|
|
raw_text = compose_embedding_source(post.title, body)
|
|
content_hash = hash_text(raw_text)
|
|
|
|
case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do
|
|
%Key{content_hash: ^content_hash} ->
|
|
if Keyword.get(opts, :refresh_index, true) and
|
|
snapshot_content_hash(post.project_id, post.id) != content_hash do
|
|
:ok = rebuild_snapshot(post.project_id)
|
|
end
|
|
|
|
:ok
|
|
|
|
existing_key ->
|
|
label = existing_key_label(existing_key) || next_label()
|
|
{:ok, vector} = embed_text(raw_text, post.language)
|
|
|
|
(existing_key || %Key{})
|
|
|> Key.changeset(%{
|
|
label: label,
|
|
post_id: post.id,
|
|
project_id: post.project_id,
|
|
content_hash: content_hash,
|
|
vector: Jason.encode!(vector)
|
|
})
|
|
|> Repo.insert_or_update()
|
|
|
|
if Keyword.get(opts, :refresh_index, true) do
|
|
:ok = rebuild_snapshot(post.project_id)
|
|
end
|
|
|
|
:ok
|
|
end
|
|
end
|
|
|
|
defp preload_keys_by_post_id(project_id) do
|
|
Repo.all(from key in Key, where: key.project_id == ^project_id)
|
|
|> Map.new(&{&1.post_id, &1})
|
|
end
|
|
|
|
defp preload_keys_by_post_id(project_id, post_ids) do
|
|
Repo.all(
|
|
from key in Key,
|
|
where: key.project_id == ^project_id and key.post_id in ^post_ids
|
|
)
|
|
|> Map.new(&{&1.post_id, &1})
|
|
end
|
|
|
|
defp max_label_value do
|
|
Repo.one(from key in Key, select: max(key.label)) || 0
|
|
end
|
|
|
|
defp compute_key_data(%Post{} = post, existing_key, next_label) do
|
|
body = resolve_post_body(post)
|
|
raw_text = compose_embedding_source(post.title, body)
|
|
content_hash = hash_text(raw_text)
|
|
|
|
if existing_key && existing_key.content_hash == content_hash do
|
|
:skip
|
|
else
|
|
{:ok, vector} = embed_text(raw_text, post.language)
|
|
label = if existing_key, do: existing_key.label, else: next_label
|
|
{:upsert, [label, post.id, post.project_id, content_hash, Jason.encode!(vector)]}
|
|
end
|
|
end
|
|
|
|
defp batch_upsert_keys([]), do: :ok
|
|
|
|
defp batch_upsert_keys(rows) do
|
|
rows
|
|
|> Enum.chunk_every(@key_batch_size)
|
|
|> Enum.each(fn chunk ->
|
|
placeholders = Enum.map_join(chunk, ", ", fn _ -> "(?, ?, ?, ?, ?)" end)
|
|
params = List.flatten(chunk)
|
|
|
|
Repo.query!(
|
|
"INSERT INTO embedding_keys (label, post_id, project_id, content_hash, vector) VALUES #{placeholders} ON CONFLICT(label) DO UPDATE SET content_hash = excluded.content_hash, vector = excluded.vector",
|
|
params
|
|
)
|
|
end)
|
|
end
|
|
|
|
def remove_post(post_id) when is_binary(post_id) do
|
|
project_id =
|
|
case Repo.get_by(Key, post_id: post_id) do
|
|
%Key{project_id: project_id} ->
|
|
project_id
|
|
|
|
nil ->
|
|
case Repo.get(Post, post_id) do
|
|
%Post{project_id: project_id} -> project_id
|
|
nil -> nil
|
|
end
|
|
end
|
|
|
|
Repo.delete_all(from key in Key, where: key.post_id == ^post_id)
|
|
|
|
if is_binary(project_id) and enabled_for_project?(project_id) do
|
|
:ok = rebuild_snapshot(project_id)
|
|
end
|
|
|
|
:ok
|
|
end
|
|
|
|
def index_unindexed(project_id) when is_binary(project_id) do
|
|
if enabled_for_project?(project_id) do
|
|
posts =
|
|
Repo.all(
|
|
from post in Post,
|
|
where: post.project_id == ^project_id,
|
|
order_by: [asc: post.created_at, asc: post.slug]
|
|
)
|
|
|
|
existing_keys = preload_keys_by_post_id(project_id)
|
|
base_label = max_label_value()
|
|
|
|
{rows, _next_label} =
|
|
Enum.reduce(posts, {[], base_label + 1}, fn post, {acc, next_label} ->
|
|
existing_key = Map.get(existing_keys, post.id)
|
|
|
|
case compute_key_data(post, existing_key, next_label) do
|
|
:skip ->
|
|
{acc, next_label}
|
|
|
|
{:upsert, row} ->
|
|
bump = if existing_key, do: 0, else: 1
|
|
{[row | acc], next_label + bump}
|
|
end
|
|
end)
|
|
|
|
batch_upsert_keys(rows)
|
|
:ok = rebuild_snapshot(project_id)
|
|
|
|
indexed =
|
|
Repo.all(from key in Key, where: key.project_id == ^project_id, select: key.post_id)
|
|
|
|
{:ok, indexed}
|
|
else
|
|
{:ok, []}
|
|
end
|
|
end
|
|
|
|
def find_similar(post_id, limit \\ 5) when is_binary(post_id) and is_integer(limit) do
|
|
case source_post_and_vector(post_id) do
|
|
{:disabled, _project_id} ->
|
|
{:ok, []}
|
|
|
|
{:error, :not_found} ->
|
|
{:ok, []}
|
|
|
|
{:ok, post, source_vector} ->
|
|
similar =
|
|
case Index.neighbors(post.project_id, post.id, limit) do
|
|
{:ok, neighbors} ->
|
|
neighbors
|
|
|
|
{:error, :missing} ->
|
|
Repo.all(
|
|
from key in Key,
|
|
where: key.project_id == ^post.project_id and key.post_id != ^post.id
|
|
)
|
|
|> Enum.map(fn key ->
|
|
%{
|
|
post_id: key.post_id,
|
|
score: cosine_similarity(source_vector, decode_vector(key.vector))
|
|
}
|
|
end)
|
|
|> Enum.sort_by(& &1.score, :desc)
|
|
|> Enum.take(max(limit, 0))
|
|
end
|
|
|
|
{:ok, similar}
|
|
end
|
|
end
|
|
|
|
def compute_similarities(source_post_id, target_post_ids)
|
|
when is_binary(source_post_id) and is_list(target_post_ids) do
|
|
case source_post_and_vector(source_post_id) do
|
|
{:disabled, _project_id} ->
|
|
{:ok, %{}}
|
|
|
|
{:error, :not_found} ->
|
|
{:ok, %{}}
|
|
|
|
{:ok, post, source_vector} ->
|
|
target_ids = Enum.uniq(target_post_ids)
|
|
|
|
scores =
|
|
Repo.all(
|
|
from key in Key,
|
|
where: key.project_id == ^post.project_id and key.post_id in ^target_ids
|
|
)
|
|
|> Enum.reduce(%{}, fn key, acc ->
|
|
if key.post_id == source_post_id do
|
|
acc
|
|
else
|
|
Map.put(
|
|
acc,
|
|
key.post_id,
|
|
cosine_similarity(source_vector, decode_vector(key.vector))
|
|
)
|
|
end
|
|
end)
|
|
|
|
{:ok, scores}
|
|
end
|
|
end
|
|
|
|
def suggest_tags(post_id, _input_text) when is_binary(post_id) do
|
|
with {:ok, _post} <- fetch_post(post_id),
|
|
{:ok, similar} <- find_similar(post_id, 10) do
|
|
suggestions =
|
|
Repo.all(from other in Post, where: other.id in ^Enum.map(similar, & &1.post_id))
|
|
|> Map.new(&{&1.id, &1})
|
|
|> then(fn posts_by_id ->
|
|
Enum.reduce(similar, %{}, fn %{post_id: similar_post_id, score: score}, acc ->
|
|
case Map.get(posts_by_id, similar_post_id) do
|
|
nil ->
|
|
acc
|
|
|
|
similar_post ->
|
|
Enum.reduce(similar_post.tags || [], acc, fn tag, tag_acc ->
|
|
Map.update(tag_acc, tag, score, &(&1 + score))
|
|
end)
|
|
end
|
|
end)
|
|
end)
|
|
|> Enum.sort_by(fn {_tag, score} -> score end, :desc)
|
|
|> Enum.take(5)
|
|
|> Enum.map(fn {tag, _score} -> tag end)
|
|
|
|
{:ok, suggestions}
|
|
else
|
|
{:error, :not_found} -> {:ok, []}
|
|
end
|
|
end
|
|
|
|
def find_duplicates(project_id, opts \\ []) when is_binary(project_id) do
|
|
if enabled_for_project?(project_id) do
|
|
on_progress = progress_callback(opts)
|
|
dismissed = dismissed_pair_keys(project_id)
|
|
|
|
duplicates =
|
|
case Index.duplicate_pairs(project_id, @duplicate_threshold, on_progress: on_progress) do
|
|
{:ok, pairs} ->
|
|
pairs
|
|
|> Enum.reject(fn pair -> pair_key(pair.post_id_a, pair.post_id_b) in dismissed end)
|
|
|> enrich_duplicate_pairs(project_id)
|
|
|
|
{:error, :missing} ->
|
|
keys =
|
|
Repo.all(
|
|
from key in Key,
|
|
where: key.project_id == ^project_id,
|
|
order_by: [asc: key.post_id]
|
|
)
|
|
|
|
total_keys = length(keys)
|
|
|
|
:ok = report_rebuild_started(on_progress, total_keys, "embedding entries")
|
|
|
|
keys
|
|
|> Enum.with_index(1)
|
|
|> Enum.flat_map(fn {left, index} ->
|
|
:ok = report_rebuild_progress(on_progress, index, total_keys, "embedding entries")
|
|
|
|
for right <- keys,
|
|
left.post_id < right.post_id,
|
|
pair_key(left.post_id, right.post_id) not in dismissed,
|
|
similarity =
|
|
cosine_similarity(decode_vector(left.vector), decode_vector(right.vector)),
|
|
similarity >= @duplicate_threshold do
|
|
%{
|
|
post_id_a: left.post_id,
|
|
post_id_b: right.post_id,
|
|
score: similarity
|
|
}
|
|
end
|
|
end)
|
|
|> enrich_duplicate_pairs(project_id)
|
|
end
|
|
|
|
:ok = report_rebuild_phase(on_progress, 0.99, "Resolving duplicate candidates")
|
|
{:ok, duplicates}
|
|
else
|
|
{:ok, []}
|
|
end
|
|
end
|
|
|
|
def dismiss_duplicate_pair(post_id_a, post_id_b)
|
|
when is_binary(post_id_a) and is_binary(post_id_b) do
|
|
with {:ok, post_a} <- fetch_post(post_id_a),
|
|
{:ok, post_b} <- fetch_post(post_id_b),
|
|
true <- post_a.project_id == post_b.project_id do
|
|
{sorted_a, sorted_b} = sort_pair(post_id_a, post_id_b)
|
|
|
|
pair =
|
|
Repo.get_by(DismissedDuplicatePair,
|
|
project_id: post_a.project_id,
|
|
post_id_a: sorted_a,
|
|
post_id_b: sorted_b
|
|
) || %DismissedDuplicatePair{}
|
|
|
|
saved_pair =
|
|
pair
|
|
|> DismissedDuplicatePair.changeset(%{
|
|
id: pair.id || Ecto.UUID.generate(),
|
|
project_id: post_a.project_id,
|
|
post_id_a: sorted_a,
|
|
post_id_b: sorted_b,
|
|
dismissed_at: Persistence.now_ms()
|
|
})
|
|
|> Repo.insert_or_update!()
|
|
|
|
{:ok, saved_pair}
|
|
else
|
|
_ -> {:error, :not_found}
|
|
end
|
|
end
|
|
|
|
def dismiss_duplicate_pairs(pair_ids) when is_list(pair_ids) do
|
|
pair_ids
|
|
|> Enum.filter(fn
|
|
{post_id_a, post_id_b} when is_binary(post_id_a) and is_binary(post_id_b) -> true
|
|
_other -> false
|
|
end)
|
|
|> Enum.map(fn {post_id_a, post_id_b} -> sort_pair(post_id_a, post_id_b) end)
|
|
|> Enum.uniq()
|
|
|> Enum.reduce_while({:ok, []}, fn {post_id_a, post_id_b}, {:ok, acc} ->
|
|
case dismiss_duplicate_pair(post_id_a, post_id_b) do
|
|
{:ok, saved_pair} -> {:cont, {:ok, [saved_pair | acc]}}
|
|
{:error, reason} -> {:halt, {:error, reason}}
|
|
end
|
|
end)
|
|
|> case do
|
|
{:ok, saved_pairs} -> {:ok, Enum.reverse(saved_pairs)}
|
|
{:error, reason} -> {:error, reason}
|
|
end
|
|
end
|
|
|
|
defp source_post_and_vector(post_id) do
|
|
with {:ok, post} <- fetch_post(post_id) do
|
|
if enabled_for_project?(post.project_id) do
|
|
:ok = ensure_key(post)
|
|
|
|
case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do
|
|
nil -> {:ok, post, []}
|
|
key -> {:ok, post, decode_vector(key.vector)}
|
|
end
|
|
else
|
|
{:disabled, post.project_id}
|
|
end
|
|
end
|
|
end
|
|
|
|
defp ensure_key(%Post{} = post) do
|
|
case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do
|
|
nil -> sync_post(post)
|
|
_key -> :ok
|
|
end
|
|
end
|
|
|
|
defp fetch_post(post_id) do
|
|
case Repo.get(Post, post_id) do
|
|
nil -> {:error, :not_found}
|
|
post -> {:ok, post}
|
|
end
|
|
end
|
|
|
|
defp enrich_duplicate_pairs(pairs, project_id) do
|
|
posts_by_id =
|
|
pairs
|
|
|> Enum.flat_map(&[&1.post_id_a, &1.post_id_b])
|
|
|> Enum.uniq()
|
|
|> then(fn post_ids ->
|
|
Repo.all(
|
|
from post in Post, where: post.project_id == ^project_id and post.id in ^post_ids
|
|
)
|
|
|> Map.new(&{&1.id, &1})
|
|
end)
|
|
|
|
pairs
|
|
|> Enum.map(fn pair ->
|
|
post_a = Map.fetch!(posts_by_id, pair.post_id_a)
|
|
post_b = Map.fetch!(posts_by_id, pair.post_id_b)
|
|
exact_match = exact_duplicate_match?(pair.score, post_a, post_b)
|
|
|
|
pair
|
|
|> Map.put(:title_a, post_a.title || "")
|
|
|> Map.put(:title_b, post_b.title || "")
|
|
|> Map.put(:similarity, pair.score)
|
|
|> Map.put(:exact_match, exact_match)
|
|
end)
|
|
|> Enum.sort_by(fn pair ->
|
|
{not pair.exact_match, -pair.score, pair.post_id_a, pair.post_id_b}
|
|
end)
|
|
end
|
|
|
|
defp exact_duplicate_match?(score, %Post{} = post_a, %Post{} = post_b) do
|
|
score >= @exact_match_score and
|
|
(post_a.title || "") == (post_b.title || "") and
|
|
resolve_post_body(post_a) == resolve_post_body(post_b)
|
|
end
|
|
|
|
defp enabled_for_project?(project_id) do
|
|
case Metadata.get_project_metadata(project_id) do
|
|
{:ok, metadata} -> metadata.semantic_similarity_enabled == true
|
|
end
|
|
end
|
|
|
|
defp existing_key_label(nil), do: nil
|
|
defp existing_key_label(%Key{label: label}), do: label
|
|
|
|
defp configured_backend do
|
|
Application.get_env(:bds, :embeddings, [])
|
|
|> Keyword.get(:backend, BDS.Embeddings.Backends.InApp)
|
|
end
|
|
|
|
defp next_label do
|
|
Repo.one(from key in Key, select: max(key.label))
|
|
|> case do
|
|
nil -> 1
|
|
value -> value + 1
|
|
end
|
|
end
|
|
|
|
defp resolve_post_body(%Post{content: content}) when is_binary(content) and content != "",
|
|
do: content
|
|
|
|
defp resolve_post_body(%Post{project_id: project_id, file_path: file_path}) do
|
|
if file_path in [nil, ""] do
|
|
""
|
|
else
|
|
project = Projects.get_project!(project_id)
|
|
full_path = Path.join(Projects.project_data_dir(project), file_path)
|
|
|
|
case File.read(full_path) do
|
|
{:ok, contents} ->
|
|
case String.split(contents, "\n---\n", parts: 2) do
|
|
[_frontmatter, body] -> String.trim_trailing(body, "\n")
|
|
_parts -> contents
|
|
end
|
|
|
|
{:error, _reason} ->
|
|
""
|
|
end
|
|
end
|
|
end
|
|
|
|
defp compose_embedding_source(title, content),
|
|
do: string_or_empty(title) <> "\n\n" <> string_or_empty(content)
|
|
|
|
defp string_or_empty(nil), do: ""
|
|
defp string_or_empty(value) when is_binary(value), do: value
|
|
|
|
defp post_content_hash(%Post{} = post) do
|
|
body = resolve_post_body(post)
|
|
hash_text(compose_embedding_source(post.title, body))
|
|
end
|
|
|
|
defp embed_text(raw_text, language) do
|
|
configured_backend().embed("query: " <> raw_text, language: language)
|
|
end
|
|
|
|
defp rebuild_snapshot(project_id) do
|
|
Index.rebuild(project_id, model_id: model_id(), dimensions: dimensions())
|
|
end
|
|
|
|
defp progress_callback(opts), do: ProgressReporter.callback(opts)
|
|
|
|
defp report_rebuild_started(callback, total, label) do
|
|
ProgressReporter.report_count_started(callback, total, label,
|
|
verb: "Rebuilding",
|
|
start_progress: 0.0,
|
|
empty_suffix: "to rebuild",
|
|
message_style: :prefix_count
|
|
)
|
|
end
|
|
|
|
defp report_rebuild_progress(callback, current, total, label) do
|
|
ProgressReporter.report_count_progress(callback, current, total, label,
|
|
verb: "Rebuilding",
|
|
start_progress: 0.0,
|
|
message_style: :prefix_count
|
|
)
|
|
end
|
|
|
|
defp report_rebuild_phase(callback, value, label),
|
|
do: ProgressReporter.report_phase(callback, value, label)
|
|
|
|
defp snapshot_content_hash(project_id, post_id) do
|
|
case Index.read(project_id) do
|
|
{:ok, snapshot} -> get_in(snapshot, ["entries", post_id, "content_hash"])
|
|
_other -> nil
|
|
end
|
|
end
|
|
|
|
defp current_embedding_status(nil, _expected_hash), do: "missing"
|
|
|
|
defp current_embedding_status(%Key{vector: vector}, _expected_hash) when vector in [nil, ""],
|
|
do: "missing"
|
|
|
|
defp current_embedding_status(%Key{content_hash: content_hash}, expected_hash)
|
|
when content_hash != expected_hash,
|
|
do: "stale"
|
|
|
|
defp current_embedding_status(%Key{}, _expected_hash), do: "ready"
|
|
|
|
defp expected_embedding_status(key, expected_hash) do
|
|
case current_embedding_status(key, expected_hash) do
|
|
"ready" -> "ready"
|
|
_other -> "re-embed required"
|
|
end
|
|
end
|
|
|
|
defp diff_field(name, db_value, file_value) do
|
|
db_value = normalize_diff_value(db_value)
|
|
file_value = normalize_diff_value(file_value)
|
|
|
|
if db_value == file_value do
|
|
nil
|
|
else
|
|
%{name: name, db_value: db_value, file_value: file_value}
|
|
end
|
|
end
|
|
|
|
defp normalize_diff_value(value) when is_binary(value), do: value
|
|
defp normalize_diff_value(nil), do: ""
|
|
defp normalize_diff_value(value), do: value
|
|
|
|
defp hash_text(text), do: :crypto.hash(:sha256, text) |> Base.encode16(case: :lower)
|
|
|
|
defp decode_vector(nil), do: []
|
|
defp decode_vector(vector), do: Jason.decode!(vector)
|
|
|
|
defp cosine_similarity([], _other), do: 0.0
|
|
defp cosine_similarity(_vector, []), do: 0.0
|
|
|
|
defp cosine_similarity(left, right) do
|
|
Enum.zip(left, right)
|
|
|> Enum.reduce(0.0, fn {left_value, right_value}, acc -> acc + left_value * right_value end)
|
|
|> max(0.0)
|
|
end
|
|
|
|
defp dismissed_pair_keys(project_id) do
|
|
Repo.all(
|
|
from pair in DismissedDuplicatePair,
|
|
where: pair.project_id == ^project_id,
|
|
select: {pair.post_id_a, pair.post_id_b}
|
|
)
|
|
|> MapSet.new(fn {post_id_a, post_id_b} -> pair_key(post_id_a, post_id_b) end)
|
|
end
|
|
|
|
defp pair_key(post_id_a, post_id_b) do
|
|
{sorted_a, sorted_b} = sort_pair(post_id_a, post_id_b)
|
|
"#{sorted_a}::#{sorted_b}"
|
|
end
|
|
|
|
defp sort_pair(post_id_a, post_id_b) when post_id_a <= post_id_b, do: {post_id_a, post_id_b}
|
|
defp sort_pair(post_id_a, post_id_b), do: {post_id_b, post_id_a}
|
|
end
|