fix: more work on metadata diff
This commit is contained in:
@@ -61,6 +61,25 @@ defmodule BDS.Embeddings do
|
||||
end
|
||||
end
|
||||
|
||||
def repair_posts(project_id, post_ids) when is_binary(project_id) and is_list(post_ids) do
|
||||
if enabled_for_project?(project_id) do
|
||||
post_ids = Enum.uniq(post_ids)
|
||||
|
||||
posts =
|
||||
Repo.all(
|
||||
from post in Post,
|
||||
where: post.project_id == ^project_id and post.id in ^post_ids,
|
||||
order_by: [asc: post.created_at, asc: post.slug]
|
||||
)
|
||||
|
||||
Enum.each(posts, &sync_post_if_enabled(&1, refresh_index: false))
|
||||
:ok = rebuild_snapshot(project_id)
|
||||
{:ok, Enum.map(posts, & &1.id)}
|
||||
else
|
||||
{:ok, []}
|
||||
end
|
||||
end
|
||||
|
||||
def rebuild_project(project_id) when is_binary(project_id) do
|
||||
if enabled_for_project?(project_id) do
|
||||
posts =
|
||||
@@ -83,12 +102,6 @@ defmodule BDS.Embeddings do
|
||||
|
||||
def diff_reports(project_id) when is_binary(project_id) do
|
||||
if enabled_for_project?(project_id) do
|
||||
snapshot_entries =
|
||||
case Index.read(project_id) do
|
||||
{:ok, snapshot} -> Map.get(snapshot, "entries", %{})
|
||||
_other -> %{}
|
||||
end
|
||||
|
||||
keys_by_post =
|
||||
Repo.all(from key in Key, where: key.project_id == ^project_id)
|
||||
|> Map.new(&{&1.post_id, &1})
|
||||
@@ -97,15 +110,14 @@ defmodule BDS.Embeddings do
|
||||
|> Enum.flat_map(fn post ->
|
||||
expected_hash = post_content_hash(post)
|
||||
key = Map.get(keys_by_post, post.id)
|
||||
snapshot_entry = Map.get(snapshot_entries, post.id)
|
||||
|
||||
differences =
|
||||
[
|
||||
diff_field("content_hash", key && key.content_hash, expected_hash),
|
||||
diff_field(
|
||||
"snapshot_content_hash",
|
||||
snapshot_entry && snapshot_entry["content_hash"],
|
||||
key && key.content_hash
|
||||
"embedding",
|
||||
current_embedding_status(key, expected_hash),
|
||||
expected_embedding_status(key, expected_hash)
|
||||
)
|
||||
]
|
||||
|> Enum.reject(&is_nil/1)
|
||||
@@ -136,6 +148,10 @@ defmodule BDS.Embeddings do
|
||||
|
||||
case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do
|
||||
%Key{content_hash: ^content_hash} ->
|
||||
if Keyword.get(opts, :refresh_index, true) and snapshot_content_hash(post.project_id, post.id) != content_hash do
|
||||
:ok = rebuild_snapshot(post.project_id)
|
||||
end
|
||||
|
||||
:ok
|
||||
|
||||
existing_key ->
|
||||
@@ -485,6 +501,31 @@ defmodule BDS.Embeddings do
|
||||
Index.rebuild(project_id, model_id: model_id(), dimensions: dimensions())
|
||||
end
|
||||
|
||||
defp snapshot_content_hash(project_id, post_id) do
|
||||
case Index.read(project_id) do
|
||||
{:ok, snapshot} -> get_in(snapshot, ["entries", post_id, "content_hash"])
|
||||
_other -> nil
|
||||
end
|
||||
end
|
||||
|
||||
defp current_embedding_status(nil, _expected_hash), do: "missing"
|
||||
|
||||
defp current_embedding_status(%Key{vector: vector}, _expected_hash) when vector in [nil, ""],
|
||||
do: "missing"
|
||||
|
||||
defp current_embedding_status(%Key{content_hash: content_hash}, expected_hash)
|
||||
when content_hash != expected_hash,
|
||||
do: "stale"
|
||||
|
||||
defp current_embedding_status(%Key{}, _expected_hash), do: "ready"
|
||||
|
||||
defp expected_embedding_status(key, expected_hash) do
|
||||
case current_embedding_status(key, expected_hash) do
|
||||
"ready" -> "ready"
|
||||
_other -> "re-embed required"
|
||||
end
|
||||
end
|
||||
|
||||
defp diff_field(name, db_value, file_value) do
|
||||
db_value = if(is_binary(db_value), do: db_value, else: db_value || "")
|
||||
file_value = if(is_binary(file_value), do: file_value, else: file_value || "")
|
||||
|
||||
@@ -11,8 +11,7 @@ defmodule BDS.Embeddings.Index do
|
||||
@neighbor_limit 21
|
||||
|
||||
def path(project_id) when is_binary(project_id) do
|
||||
project = Projects.get_project!(project_id)
|
||||
Path.join(Projects.project_data_dir(project), "embeddings.usearch")
|
||||
Path.join(Projects.project_cache_dir(project_id), "embeddings.usearch")
|
||||
end
|
||||
|
||||
def rebuild(project_id, opts) when is_binary(project_id) and is_list(opts) do
|
||||
@@ -48,17 +47,13 @@ defmodule BDS.Embeddings.Index do
|
||||
"entries" => entries
|
||||
}
|
||||
|
||||
write_snapshot(path(project_id), payload)
|
||||
write_snapshot(path(project_id), payload, project_id)
|
||||
end
|
||||
|
||||
def read(project_id) when is_binary(project_id) do
|
||||
snapshot_path = path(project_id)
|
||||
|
||||
case File.read(snapshot_path) do
|
||||
{:ok, contents} -> {:ok, Jason.decode!(contents)}
|
||||
{:error, :enoent} -> read_legacy_snapshot(project_id)
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
project_id
|
||||
|> candidate_paths()
|
||||
|> read_snapshot_paths()
|
||||
end
|
||||
|
||||
def neighbors(project_id, post_id, limit) when is_binary(project_id) and is_binary(post_id) do
|
||||
@@ -123,7 +118,7 @@ defmodule BDS.Embeddings.Index do
|
||||
|> Enum.take(@neighbor_limit)
|
||||
end
|
||||
|
||||
defp write_snapshot(snapshot_path, payload) do
|
||||
defp write_snapshot(snapshot_path, payload, project_id) do
|
||||
:ok = Persistence.atomic_write(snapshot_path, Jason.encode!(payload))
|
||||
legacy_path = legacy_path(snapshot_path)
|
||||
|
||||
@@ -131,19 +126,59 @@ defmodule BDS.Embeddings.Index do
|
||||
File.rm(legacy_path)
|
||||
end
|
||||
|
||||
cleanup_legacy_project_snapshots(project_id, snapshot_path)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
defp read_legacy_snapshot(project_id) do
|
||||
legacy_snapshot_path = project_id |> path() |> legacy_path()
|
||||
defp candidate_paths(project_id) do
|
||||
current_snapshot_path = path(project_id)
|
||||
legacy_project_snapshot_path = legacy_project_snapshot_path(project_id)
|
||||
|
||||
case File.read(legacy_snapshot_path) do
|
||||
[
|
||||
current_snapshot_path,
|
||||
legacy_path(current_snapshot_path),
|
||||
legacy_project_snapshot_path,
|
||||
legacy_project_snapshot_path && legacy_path(legacy_project_snapshot_path)
|
||||
]
|
||||
|> Enum.filter(&is_binary/1)
|
||||
|> Enum.uniq()
|
||||
end
|
||||
|
||||
defp read_snapshot_paths([]), do: {:error, :missing}
|
||||
|
||||
defp read_snapshot_paths([snapshot_path | rest]) do
|
||||
case File.read(snapshot_path) do
|
||||
{:ok, contents} -> {:ok, Jason.decode!(contents)}
|
||||
{:error, :enoent} -> {:error, :missing}
|
||||
{:error, :enoent} -> read_snapshot_paths(rest)
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp cleanup_legacy_project_snapshots(project_id, snapshot_path) do
|
||||
current_paths = [snapshot_path, legacy_path(snapshot_path)]
|
||||
|
||||
project_id
|
||||
|> legacy_project_snapshot_path()
|
||||
|> then(fn legacy_snapshot_path ->
|
||||
[legacy_snapshot_path, legacy_snapshot_path && legacy_path(legacy_snapshot_path)]
|
||||
end)
|
||||
|> Enum.filter(&is_binary/1)
|
||||
|> Enum.reject(&(&1 in current_paths))
|
||||
|> Enum.each(fn legacy_snapshot_path ->
|
||||
if File.exists?(legacy_snapshot_path) do
|
||||
File.rm(legacy_snapshot_path)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp legacy_project_snapshot_path(project_id) do
|
||||
case Projects.get_project(project_id) do
|
||||
nil -> nil
|
||||
project -> Path.join(Projects.project_data_dir(project), "embeddings.usearch")
|
||||
end
|
||||
end
|
||||
|
||||
defp legacy_path(snapshot_path) do
|
||||
Path.join(Path.dirname(snapshot_path), "embeddings.index.json")
|
||||
end
|
||||
|
||||
@@ -26,20 +26,28 @@ defmodule BDS.Maintenance do
|
||||
total = length(items)
|
||||
:ok = report_started(on_progress, total, "Repairing metadata differences")
|
||||
|
||||
result =
|
||||
items
|
||||
|> Enum.with_index(1)
|
||||
|> Enum.reduce(%{repaired: 0, failed: 0}, fn {item, index}, acc ->
|
||||
next_acc =
|
||||
case repair_metadata_diff_item(project_id, direction, item) do
|
||||
:ok -> %{acc | repaired: acc.repaired + 1}
|
||||
{:ok, _value} -> %{acc | repaired: acc.repaired + 1}
|
||||
_error -> %{acc | failed: acc.failed + 1}
|
||||
end
|
||||
normalized_direction = normalize_repair_direction(direction)
|
||||
|
||||
:ok = report_progress(on_progress, index, total, "Repairing metadata differences")
|
||||
next_acc
|
||||
end)
|
||||
result =
|
||||
case repair_embedding_batch(project_id, normalized_direction, items, on_progress, total) do
|
||||
{:ok, batch_result} ->
|
||||
batch_result
|
||||
|
||||
:unsupported ->
|
||||
items
|
||||
|> Enum.with_index(1)
|
||||
|> Enum.reduce(%{repaired: 0, failed: 0}, fn {item, index}, acc ->
|
||||
next_acc =
|
||||
case repair_metadata_diff_item(project_id, normalized_direction, item) do
|
||||
:ok -> %{acc | repaired: acc.repaired + 1}
|
||||
{:ok, _value} -> %{acc | repaired: acc.repaired + 1}
|
||||
_error -> %{acc | failed: acc.failed + 1}
|
||||
end
|
||||
|
||||
:ok = report_progress(on_progress, index, total, "Repairing metadata differences")
|
||||
next_acc
|
||||
end)
|
||||
end
|
||||
|
||||
{:ok, result}
|
||||
end
|
||||
@@ -670,6 +678,63 @@ defmodule BDS.Maintenance do
|
||||
end
|
||||
end
|
||||
|
||||
defp repair_embedding_batch(project_id, direction, items, on_progress, total)
|
||||
when direction in [:file_to_db, :db_to_file] do
|
||||
if items != [] and Enum.all?(items, &(metadata_diff_item_entity_type(&1) == "embedding")) do
|
||||
result =
|
||||
case direction do
|
||||
:file_to_db ->
|
||||
post_ids = Enum.map(items, &metadata_diff_item_entity_id/1)
|
||||
|
||||
case Embeddings.repair_posts(project_id, post_ids) do
|
||||
{:ok, repaired_post_ids} ->
|
||||
repaired_post_ids = MapSet.new(repaired_post_ids)
|
||||
|
||||
build_batch_repair_result(items, total, on_progress, fn item ->
|
||||
MapSet.member?(repaired_post_ids, metadata_diff_item_entity_id(item))
|
||||
end)
|
||||
|
||||
_other ->
|
||||
build_batch_repair_result(items, total, on_progress, fn _item -> false end)
|
||||
end
|
||||
|
||||
:db_to_file ->
|
||||
repaired? = Embeddings.refresh_snapshot(project_id) == :ok
|
||||
build_batch_repair_result(items, total, on_progress, fn _item -> repaired? end)
|
||||
end
|
||||
|
||||
{:ok, result}
|
||||
else
|
||||
:unsupported
|
||||
end
|
||||
end
|
||||
|
||||
defp repair_embedding_batch(_project_id, _direction, _items, _on_progress, _total), do: :unsupported
|
||||
|
||||
defp build_batch_repair_result(items, total, on_progress, repaired?) do
|
||||
items
|
||||
|> Enum.with_index(1)
|
||||
|> Enum.reduce(%{repaired: 0, failed: 0}, fn {item, index}, acc ->
|
||||
next_acc =
|
||||
if repaired?.(item) do
|
||||
%{acc | repaired: acc.repaired + 1}
|
||||
else
|
||||
%{acc | failed: acc.failed + 1}
|
||||
end
|
||||
|
||||
:ok = report_progress(on_progress, index, total, "Repairing metadata differences")
|
||||
next_acc
|
||||
end)
|
||||
end
|
||||
|
||||
defp metadata_diff_item_entity_type(item) do
|
||||
Map.get(item, :entity_type) || Map.get(item, "entity_type")
|
||||
end
|
||||
|
||||
defp metadata_diff_item_entity_id(item) do
|
||||
Map.get(item, :entity_id) || Map.get(item, "entity_id")
|
||||
end
|
||||
|
||||
defp import_metadata_diff_orphan(project_id, orphan) do
|
||||
file_path = Map.get(orphan, :file_path) || Map.get(orphan, "file_path")
|
||||
|
||||
|
||||
@@ -73,6 +73,12 @@ defmodule BDS.Projects do
|
||||
project.data_path || Path.expand("../../priv/data/projects/#{project.id}", __DIR__)
|
||||
end
|
||||
|
||||
def project_cache_dir(%Project{} = project), do: project_cache_dir(project.id)
|
||||
|
||||
def project_cache_dir(project_id) when is_binary(project_id) do
|
||||
Path.join([project_cache_root(), "projects", project_id])
|
||||
end
|
||||
|
||||
def create_project(attrs) do
|
||||
now = Persistence.now_ms()
|
||||
name = attr(attrs, :name) || ""
|
||||
@@ -140,6 +146,7 @@ defmodule BDS.Projects do
|
||||
|
||||
%Project{} = project ->
|
||||
internal_dir = if is_nil(project.data_path), do: project_data_dir(project), else: nil
|
||||
cleanup_dirs = [internal_dir, project_cache_dir(project)] |> Enum.filter(&is_binary/1) |> Enum.uniq()
|
||||
|
||||
Repo.transaction(fn ->
|
||||
Repo.delete!(project)
|
||||
@@ -147,9 +154,9 @@ defmodule BDS.Projects do
|
||||
end)
|
||||
|> case do
|
||||
{:ok, deleted_project} ->
|
||||
if is_binary(internal_dir) do
|
||||
_ = File.rm_rf(internal_dir)
|
||||
end
|
||||
Enum.each(cleanup_dirs, fn dir ->
|
||||
_ = File.rm_rf(dir)
|
||||
end)
|
||||
|
||||
{:ok, deleted_project}
|
||||
|
||||
@@ -202,6 +209,20 @@ defmodule BDS.Projects do
|
||||
not Repo.exists?(from project in Project, where: project.slug == ^slug)
|
||||
end
|
||||
|
||||
defp repo_data_dir do
|
||||
Application.fetch_env!(:bds, BDS.Repo)
|
||||
|> Keyword.fetch!(:database)
|
||||
|> Path.expand()
|
||||
|> Path.dirname()
|
||||
end
|
||||
|
||||
defp project_cache_root do
|
||||
case Application.get_env(:bds, :project_cache_root) do
|
||||
root when is_binary(root) -> Path.expand(root)
|
||||
_other -> repo_data_dir()
|
||||
end
|
||||
end
|
||||
|
||||
defp attr(attrs, key) do
|
||||
cond do
|
||||
Map.has_key?(attrs, key) -> Map.get(attrs, key)
|
||||
|
||||
@@ -293,6 +293,11 @@ defmodule BDS.EmbeddingsTest do
|
||||
|
||||
index_path = BDS.Embeddings.index_path(project.id)
|
||||
assert File.exists?(index_path)
|
||||
refute String.starts_with?(index_path, BDS.Projects.project_data_dir(project))
|
||||
|
||||
cache_root = Application.fetch_env!(:bds, :project_cache_root) |> Path.expand()
|
||||
|
||||
assert index_path == Path.join([cache_root, "projects", project.id, "embeddings.usearch"])
|
||||
|
||||
snapshot = index_path |> File.read!() |> Jason.decode!()
|
||||
assert snapshot["project_id"] == project.id
|
||||
@@ -306,7 +311,7 @@ defmodule BDS.EmbeddingsTest do
|
||||
end)
|
||||
end
|
||||
|
||||
test "embedding index uses the old-app persisted file name", %{project: project} do
|
||||
test "embedding index uses the app-internal persisted file name", %{project: project} do
|
||||
assert BDS.Embeddings.index_path(project.id) =~ "/embeddings.usearch"
|
||||
end
|
||||
|
||||
@@ -343,4 +348,39 @@ defmodule BDS.EmbeddingsTest do
|
||||
assert refreshed_key.content_hash == stale_key.content_hash
|
||||
assert File.exists?(BDS.Embeddings.index_path(project.id))
|
||||
end
|
||||
|
||||
test "sync_post refreshes snapshot drift when the embedding hash is already current", %{project: project} do
|
||||
assert {:ok, _metadata} =
|
||||
BDS.Metadata.update_project_metadata(project.id, %{semantic_similarity_enabled: true})
|
||||
|
||||
assert {:ok, post} =
|
||||
BDS.Posts.create_post(%{
|
||||
project_id: project.id,
|
||||
title: "Snapshot Repair",
|
||||
content: "space rocket orbit mission galaxy",
|
||||
language: "en"
|
||||
})
|
||||
|
||||
assert {:ok, post} = BDS.Posts.publish_post(post.id)
|
||||
assert {:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id)
|
||||
|
||||
key = BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id)
|
||||
index_path = BDS.Embeddings.index_path(project.id)
|
||||
|
||||
snapshot = index_path |> File.read!() |> Jason.decode!()
|
||||
|
||||
drifted_snapshot =
|
||||
put_in(snapshot, ["entries", post.id, "content_hash"], "stale-snapshot-hash")
|
||||
|
||||
File.write!(index_path, Jason.encode!(drifted_snapshot))
|
||||
|
||||
refute Enum.any?(BDS.Embeddings.diff_reports(project.id), &(&1.entity_id == post.id))
|
||||
|
||||
assert :ok = BDS.Embeddings.sync_post(post.id)
|
||||
|
||||
repaired_snapshot = index_path |> File.read!() |> Jason.decode!()
|
||||
assert get_in(repaired_snapshot, ["entries", post.id, "content_hash"]) == key.content_hash
|
||||
|
||||
refute Enum.any?(BDS.Embeddings.diff_reports(project.id), &(&1.entity_id == post.id))
|
||||
end
|
||||
end
|
||||
|
||||
@@ -390,12 +390,13 @@ defmodule BDS.MaintenanceTest do
|
||||
assert File.exists?(index_path)
|
||||
|
||||
Repo.delete_all(from key in BDS.Embeddings.Key, where: key.project_id == ^project.id)
|
||||
File.rm!(index_path)
|
||||
|
||||
assert {:ok, %{diff_reports: diff_reports}} = BDS.Maintenance.metadata_diff(project.id)
|
||||
|
||||
assert Enum.any?(diff_reports, fn report ->
|
||||
report.entity_type == "embedding" and report.entity_id == post.id
|
||||
report.entity_type == "embedding" and report.entity_id == post.id and
|
||||
Enum.any?(report.differences, &(&1.name == "content_hash" and &1.file_value != "")) and
|
||||
Enum.any?(report.differences, &(&1.name == "embedding" and &1.db_value == "missing" and &1.file_value == "re-embed required"))
|
||||
end)
|
||||
|
||||
assert {:ok, rebuilt_post_ids} = BDS.Maintenance.rebuild_from_filesystem(project.id, "embedding")
|
||||
|
||||
@@ -1,2 +1,10 @@
|
||||
cache_root = Path.join(System.tmp_dir!(), "bds-test-cache-#{System.unique_integer([:positive])}")
|
||||
File.mkdir_p!(cache_root)
|
||||
Application.put_env(:bds, :project_cache_root, cache_root)
|
||||
|
||||
ExUnit.start()
|
||||
ExUnit.after_suite(fn _results ->
|
||||
File.rm_rf(cache_root)
|
||||
end)
|
||||
|
||||
Ecto.Adapters.SQL.Sandbox.mode(BDS.Repo, :manual)
|
||||
|
||||
Reference in New Issue
Block a user