From 59833dcabea86ceb572b950af73d9dea07248e91 Mon Sep 17 00:00:00 2001 From: Chili Palmer Date: Mon, 27 Apr 2026 11:40:20 +0200 Subject: [PATCH] fix: more work on metadata diff --- lib/bds/embeddings.ex | 61 +++++++++++++++++++---- lib/bds/embeddings/index.ex | 65 +++++++++++++++++++------ lib/bds/maintenance.ex | 91 ++++++++++++++++++++++++++++++----- lib/bds/projects.ex | 27 +++++++++-- test/bds/embeddings_test.exs | 42 +++++++++++++++- test/bds/maintenance_test.exs | 5 +- test/test_helper.exs | 8 +++ 7 files changed, 255 insertions(+), 44 deletions(-) diff --git a/lib/bds/embeddings.ex b/lib/bds/embeddings.ex index 300785e..ed82d79 100644 --- a/lib/bds/embeddings.ex +++ b/lib/bds/embeddings.ex @@ -61,6 +61,25 @@ defmodule BDS.Embeddings do end end + def repair_posts(project_id, post_ids) when is_binary(project_id) and is_list(post_ids) do + if enabled_for_project?(project_id) do + post_ids = Enum.uniq(post_ids) + + posts = + Repo.all( + from post in Post, + where: post.project_id == ^project_id and post.id in ^post_ids, + order_by: [asc: post.created_at, asc: post.slug] + ) + + Enum.each(posts, &sync_post_if_enabled(&1, refresh_index: false)) + :ok = rebuild_snapshot(project_id) + {:ok, Enum.map(posts, & &1.id)} + else + {:ok, []} + end + end + def rebuild_project(project_id) when is_binary(project_id) do if enabled_for_project?(project_id) do posts = @@ -83,12 +102,6 @@ defmodule BDS.Embeddings do def diff_reports(project_id) when is_binary(project_id) do if enabled_for_project?(project_id) do - snapshot_entries = - case Index.read(project_id) do - {:ok, snapshot} -> Map.get(snapshot, "entries", %{}) - _other -> %{} - end - keys_by_post = Repo.all(from key in Key, where: key.project_id == ^project_id) |> Map.new(&{&1.post_id, &1}) @@ -97,15 +110,14 @@ defmodule BDS.Embeddings do |> Enum.flat_map(fn post -> expected_hash = post_content_hash(post) key = Map.get(keys_by_post, post.id) - snapshot_entry = Map.get(snapshot_entries, post.id) differences = [ diff_field("content_hash", key && key.content_hash, expected_hash), diff_field( - "snapshot_content_hash", - snapshot_entry && snapshot_entry["content_hash"], - key && key.content_hash + "embedding", + current_embedding_status(key, expected_hash), + expected_embedding_status(key, expected_hash) ) ] |> Enum.reject(&is_nil/1) @@ -136,6 +148,10 @@ defmodule BDS.Embeddings do case Repo.get_by(Key, post_id: post.id, project_id: post.project_id) do %Key{content_hash: ^content_hash} -> + if Keyword.get(opts, :refresh_index, true) and snapshot_content_hash(post.project_id, post.id) != content_hash do + :ok = rebuild_snapshot(post.project_id) + end + :ok existing_key -> @@ -485,6 +501,31 @@ defmodule BDS.Embeddings do Index.rebuild(project_id, model_id: model_id(), dimensions: dimensions()) end + defp snapshot_content_hash(project_id, post_id) do + case Index.read(project_id) do + {:ok, snapshot} -> get_in(snapshot, ["entries", post_id, "content_hash"]) + _other -> nil + end + end + + defp current_embedding_status(nil, _expected_hash), do: "missing" + + defp current_embedding_status(%Key{vector: vector}, _expected_hash) when vector in [nil, ""], + do: "missing" + + defp current_embedding_status(%Key{content_hash: content_hash}, expected_hash) + when content_hash != expected_hash, + do: "stale" + + defp current_embedding_status(%Key{}, _expected_hash), do: "ready" + + defp expected_embedding_status(key, expected_hash) do + case current_embedding_status(key, expected_hash) do + "ready" -> "ready" + _other -> "re-embed required" + end + end + defp diff_field(name, db_value, file_value) do db_value = if(is_binary(db_value), do: db_value, else: db_value || "") file_value = if(is_binary(file_value), do: file_value, else: file_value || "") diff --git a/lib/bds/embeddings/index.ex b/lib/bds/embeddings/index.ex index 5cee159..2be3a96 100644 --- a/lib/bds/embeddings/index.ex +++ b/lib/bds/embeddings/index.ex @@ -11,8 +11,7 @@ defmodule BDS.Embeddings.Index do @neighbor_limit 21 def path(project_id) when is_binary(project_id) do - project = Projects.get_project!(project_id) - Path.join(Projects.project_data_dir(project), "embeddings.usearch") + Path.join(Projects.project_cache_dir(project_id), "embeddings.usearch") end def rebuild(project_id, opts) when is_binary(project_id) and is_list(opts) do @@ -48,17 +47,13 @@ defmodule BDS.Embeddings.Index do "entries" => entries } - write_snapshot(path(project_id), payload) + write_snapshot(path(project_id), payload, project_id) end def read(project_id) when is_binary(project_id) do - snapshot_path = path(project_id) - - case File.read(snapshot_path) do - {:ok, contents} -> {:ok, Jason.decode!(contents)} - {:error, :enoent} -> read_legacy_snapshot(project_id) - {:error, reason} -> {:error, reason} - end + project_id + |> candidate_paths() + |> read_snapshot_paths() end def neighbors(project_id, post_id, limit) when is_binary(project_id) and is_binary(post_id) do @@ -123,7 +118,7 @@ defmodule BDS.Embeddings.Index do |> Enum.take(@neighbor_limit) end - defp write_snapshot(snapshot_path, payload) do + defp write_snapshot(snapshot_path, payload, project_id) do :ok = Persistence.atomic_write(snapshot_path, Jason.encode!(payload)) legacy_path = legacy_path(snapshot_path) @@ -131,19 +126,59 @@ defmodule BDS.Embeddings.Index do File.rm(legacy_path) end + cleanup_legacy_project_snapshots(project_id, snapshot_path) + :ok end - defp read_legacy_snapshot(project_id) do - legacy_snapshot_path = project_id |> path() |> legacy_path() + defp candidate_paths(project_id) do + current_snapshot_path = path(project_id) + legacy_project_snapshot_path = legacy_project_snapshot_path(project_id) - case File.read(legacy_snapshot_path) do + [ + current_snapshot_path, + legacy_path(current_snapshot_path), + legacy_project_snapshot_path, + legacy_project_snapshot_path && legacy_path(legacy_project_snapshot_path) + ] + |> Enum.filter(&is_binary/1) + |> Enum.uniq() + end + + defp read_snapshot_paths([]), do: {:error, :missing} + + defp read_snapshot_paths([snapshot_path | rest]) do + case File.read(snapshot_path) do {:ok, contents} -> {:ok, Jason.decode!(contents)} - {:error, :enoent} -> {:error, :missing} + {:error, :enoent} -> read_snapshot_paths(rest) {:error, reason} -> {:error, reason} end end + defp cleanup_legacy_project_snapshots(project_id, snapshot_path) do + current_paths = [snapshot_path, legacy_path(snapshot_path)] + + project_id + |> legacy_project_snapshot_path() + |> then(fn legacy_snapshot_path -> + [legacy_snapshot_path, legacy_snapshot_path && legacy_path(legacy_snapshot_path)] + end) + |> Enum.filter(&is_binary/1) + |> Enum.reject(&(&1 in current_paths)) + |> Enum.each(fn legacy_snapshot_path -> + if File.exists?(legacy_snapshot_path) do + File.rm(legacy_snapshot_path) + end + end) + end + + defp legacy_project_snapshot_path(project_id) do + case Projects.get_project(project_id) do + nil -> nil + project -> Path.join(Projects.project_data_dir(project), "embeddings.usearch") + end + end + defp legacy_path(snapshot_path) do Path.join(Path.dirname(snapshot_path), "embeddings.index.json") end diff --git a/lib/bds/maintenance.ex b/lib/bds/maintenance.ex index 4125522..0839846 100644 --- a/lib/bds/maintenance.ex +++ b/lib/bds/maintenance.ex @@ -26,20 +26,28 @@ defmodule BDS.Maintenance do total = length(items) :ok = report_started(on_progress, total, "Repairing metadata differences") - result = - items - |> Enum.with_index(1) - |> Enum.reduce(%{repaired: 0, failed: 0}, fn {item, index}, acc -> - next_acc = - case repair_metadata_diff_item(project_id, direction, item) do - :ok -> %{acc | repaired: acc.repaired + 1} - {:ok, _value} -> %{acc | repaired: acc.repaired + 1} - _error -> %{acc | failed: acc.failed + 1} - end + normalized_direction = normalize_repair_direction(direction) - :ok = report_progress(on_progress, index, total, "Repairing metadata differences") - next_acc - end) + result = + case repair_embedding_batch(project_id, normalized_direction, items, on_progress, total) do + {:ok, batch_result} -> + batch_result + + :unsupported -> + items + |> Enum.with_index(1) + |> Enum.reduce(%{repaired: 0, failed: 0}, fn {item, index}, acc -> + next_acc = + case repair_metadata_diff_item(project_id, normalized_direction, item) do + :ok -> %{acc | repaired: acc.repaired + 1} + {:ok, _value} -> %{acc | repaired: acc.repaired + 1} + _error -> %{acc | failed: acc.failed + 1} + end + + :ok = report_progress(on_progress, index, total, "Repairing metadata differences") + next_acc + end) + end {:ok, result} end @@ -670,6 +678,63 @@ defmodule BDS.Maintenance do end end + defp repair_embedding_batch(project_id, direction, items, on_progress, total) + when direction in [:file_to_db, :db_to_file] do + if items != [] and Enum.all?(items, &(metadata_diff_item_entity_type(&1) == "embedding")) do + result = + case direction do + :file_to_db -> + post_ids = Enum.map(items, &metadata_diff_item_entity_id/1) + + case Embeddings.repair_posts(project_id, post_ids) do + {:ok, repaired_post_ids} -> + repaired_post_ids = MapSet.new(repaired_post_ids) + + build_batch_repair_result(items, total, on_progress, fn item -> + MapSet.member?(repaired_post_ids, metadata_diff_item_entity_id(item)) + end) + + _other -> + build_batch_repair_result(items, total, on_progress, fn _item -> false end) + end + + :db_to_file -> + repaired? = Embeddings.refresh_snapshot(project_id) == :ok + build_batch_repair_result(items, total, on_progress, fn _item -> repaired? end) + end + + {:ok, result} + else + :unsupported + end + end + + defp repair_embedding_batch(_project_id, _direction, _items, _on_progress, _total), do: :unsupported + + defp build_batch_repair_result(items, total, on_progress, repaired?) do + items + |> Enum.with_index(1) + |> Enum.reduce(%{repaired: 0, failed: 0}, fn {item, index}, acc -> + next_acc = + if repaired?.(item) do + %{acc | repaired: acc.repaired + 1} + else + %{acc | failed: acc.failed + 1} + end + + :ok = report_progress(on_progress, index, total, "Repairing metadata differences") + next_acc + end) + end + + defp metadata_diff_item_entity_type(item) do + Map.get(item, :entity_type) || Map.get(item, "entity_type") + end + + defp metadata_diff_item_entity_id(item) do + Map.get(item, :entity_id) || Map.get(item, "entity_id") + end + defp import_metadata_diff_orphan(project_id, orphan) do file_path = Map.get(orphan, :file_path) || Map.get(orphan, "file_path") diff --git a/lib/bds/projects.ex b/lib/bds/projects.ex index 6d1d832..7872a53 100644 --- a/lib/bds/projects.ex +++ b/lib/bds/projects.ex @@ -73,6 +73,12 @@ defmodule BDS.Projects do project.data_path || Path.expand("../../priv/data/projects/#{project.id}", __DIR__) end + def project_cache_dir(%Project{} = project), do: project_cache_dir(project.id) + + def project_cache_dir(project_id) when is_binary(project_id) do + Path.join([project_cache_root(), "projects", project_id]) + end + def create_project(attrs) do now = Persistence.now_ms() name = attr(attrs, :name) || "" @@ -140,6 +146,7 @@ defmodule BDS.Projects do %Project{} = project -> internal_dir = if is_nil(project.data_path), do: project_data_dir(project), else: nil + cleanup_dirs = [internal_dir, project_cache_dir(project)] |> Enum.filter(&is_binary/1) |> Enum.uniq() Repo.transaction(fn -> Repo.delete!(project) @@ -147,9 +154,9 @@ defmodule BDS.Projects do end) |> case do {:ok, deleted_project} -> - if is_binary(internal_dir) do - _ = File.rm_rf(internal_dir) - end + Enum.each(cleanup_dirs, fn dir -> + _ = File.rm_rf(dir) + end) {:ok, deleted_project} @@ -202,6 +209,20 @@ defmodule BDS.Projects do not Repo.exists?(from project in Project, where: project.slug == ^slug) end + defp repo_data_dir do + Application.fetch_env!(:bds, BDS.Repo) + |> Keyword.fetch!(:database) + |> Path.expand() + |> Path.dirname() + end + + defp project_cache_root do + case Application.get_env(:bds, :project_cache_root) do + root when is_binary(root) -> Path.expand(root) + _other -> repo_data_dir() + end + end + defp attr(attrs, key) do cond do Map.has_key?(attrs, key) -> Map.get(attrs, key) diff --git a/test/bds/embeddings_test.exs b/test/bds/embeddings_test.exs index 2c3a392..6a36a88 100644 --- a/test/bds/embeddings_test.exs +++ b/test/bds/embeddings_test.exs @@ -293,6 +293,11 @@ defmodule BDS.EmbeddingsTest do index_path = BDS.Embeddings.index_path(project.id) assert File.exists?(index_path) + refute String.starts_with?(index_path, BDS.Projects.project_data_dir(project)) + + cache_root = Application.fetch_env!(:bds, :project_cache_root) |> Path.expand() + + assert index_path == Path.join([cache_root, "projects", project.id, "embeddings.usearch"]) snapshot = index_path |> File.read!() |> Jason.decode!() assert snapshot["project_id"] == project.id @@ -306,7 +311,7 @@ defmodule BDS.EmbeddingsTest do end) end - test "embedding index uses the old-app persisted file name", %{project: project} do + test "embedding index uses the app-internal persisted file name", %{project: project} do assert BDS.Embeddings.index_path(project.id) =~ "/embeddings.usearch" end @@ -343,4 +348,39 @@ defmodule BDS.EmbeddingsTest do assert refreshed_key.content_hash == stale_key.content_hash assert File.exists?(BDS.Embeddings.index_path(project.id)) end + + test "sync_post refreshes snapshot drift when the embedding hash is already current", %{project: project} do + assert {:ok, _metadata} = + BDS.Metadata.update_project_metadata(project.id, %{semantic_similarity_enabled: true}) + + assert {:ok, post} = + BDS.Posts.create_post(%{ + project_id: project.id, + title: "Snapshot Repair", + content: "space rocket orbit mission galaxy", + language: "en" + }) + + assert {:ok, post} = BDS.Posts.publish_post(post.id) + assert {:ok, _indexed} = BDS.Embeddings.index_unindexed(project.id) + + key = BDS.Repo.get_by!(BDS.Embeddings.Key, project_id: project.id, post_id: post.id) + index_path = BDS.Embeddings.index_path(project.id) + + snapshot = index_path |> File.read!() |> Jason.decode!() + + drifted_snapshot = + put_in(snapshot, ["entries", post.id, "content_hash"], "stale-snapshot-hash") + + File.write!(index_path, Jason.encode!(drifted_snapshot)) + + refute Enum.any?(BDS.Embeddings.diff_reports(project.id), &(&1.entity_id == post.id)) + + assert :ok = BDS.Embeddings.sync_post(post.id) + + repaired_snapshot = index_path |> File.read!() |> Jason.decode!() + assert get_in(repaired_snapshot, ["entries", post.id, "content_hash"]) == key.content_hash + + refute Enum.any?(BDS.Embeddings.diff_reports(project.id), &(&1.entity_id == post.id)) + end end diff --git a/test/bds/maintenance_test.exs b/test/bds/maintenance_test.exs index 5edcdbb..df3ad20 100644 --- a/test/bds/maintenance_test.exs +++ b/test/bds/maintenance_test.exs @@ -390,12 +390,13 @@ defmodule BDS.MaintenanceTest do assert File.exists?(index_path) Repo.delete_all(from key in BDS.Embeddings.Key, where: key.project_id == ^project.id) - File.rm!(index_path) assert {:ok, %{diff_reports: diff_reports}} = BDS.Maintenance.metadata_diff(project.id) assert Enum.any?(diff_reports, fn report -> - report.entity_type == "embedding" and report.entity_id == post.id + report.entity_type == "embedding" and report.entity_id == post.id and + Enum.any?(report.differences, &(&1.name == "content_hash" and &1.file_value != "")) and + Enum.any?(report.differences, &(&1.name == "embedding" and &1.db_value == "missing" and &1.file_value == "re-embed required")) end) assert {:ok, rebuilt_post_ids} = BDS.Maintenance.rebuild_from_filesystem(project.id, "embedding") diff --git a/test/test_helper.exs b/test/test_helper.exs index 604fe29..778d41f 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,2 +1,10 @@ +cache_root = Path.join(System.tmp_dir!(), "bds-test-cache-#{System.unique_integer([:positive])}") +File.mkdir_p!(cache_root) +Application.put_env(:bds, :project_cache_root, cache_root) + ExUnit.start() +ExUnit.after_suite(fn _results -> + File.rm_rf(cache_root) +end) + Ecto.Adapters.SQL.Sandbox.mode(BDS.Repo, :manual)