fix: fixed progress on embedding rebuild

This commit is contained in:
2026-04-27 12:03:42 +02:00
parent 59833dcabe
commit ce3a572a0c
9 changed files with 404 additions and 99 deletions

View File

@@ -81,16 +81,14 @@ defmodule BDS.Desktop.ShellCommands do
{:ok, posts_task} = {:ok, posts_task} =
Tasks.submit_task("Reindex Search Text", fn report -> Tasks.submit_task("Reindex Search Text", fn report ->
report.(0.0, "Clearing and rebuilding post search indexes") :ok = Search.reindex_posts(project.id, on_progress: report)
:ok = Search.reindex_posts(project.id)
report.(1.0, "Post search text reindexed") report.(1.0, "Post search text reindexed")
%{project_id: project.id, entity: "posts"} %{project_id: project.id, entity: "posts"}
end, attrs) end, attrs)
{:ok, _media_task} = {:ok, _media_task} =
Tasks.submit_task("Reindex Media Search Text", fn report -> Tasks.submit_task("Reindex Media Search Text", fn report ->
report.(0.0, "Clearing and rebuilding media search indexes") :ok = Search.reindex_media(project.id, on_progress: report)
:ok = Search.reindex_media(project.id)
report.(1.0, "Media search text reindexed") report.(1.0, "Media search text reindexed")
%{project_id: project.id, entity: "media"} %{project_id: project.id, entity: "media"}
end, attrs) end, attrs)
@@ -110,8 +108,7 @@ defmodule BDS.Desktop.ShellCommands do
defp dispatch("rebuild_embedding_index", project, _params) do defp dispatch("rebuild_embedding_index", project, _params) do
queue_task(project, "rebuild_embedding_index", "Rebuild Embedding Index", "Embeddings", fn report -> queue_task(project, "rebuild_embedding_index", "Rebuild Embedding Index", "Embeddings", fn report ->
report.(0.2, "Rebuilding semantic index") {:ok, rebuilt_post_ids} = Embeddings.rebuild_project(project.id, on_progress: report)
{:ok, rebuilt_post_ids} = Embeddings.rebuild_project(project.id)
report.(1.0, "Embedding index rebuilt") report.(1.0, "Embedding index rebuilt")
%{project_id: project.id, rebuilt_post_ids: rebuilt_post_ids, rebuilt_count: length(rebuilt_post_ids)} %{project_id: project.id, rebuilt_post_ids: rebuilt_post_ids, rebuilt_count: length(rebuilt_post_ids)}
end) end)
@@ -151,8 +148,7 @@ defmodule BDS.Desktop.ShellCommands do
defp dispatch("rebuild_post_links", project, _params) do defp dispatch("rebuild_post_links", project, _params) do
queue_task(project, "rebuild_post_links", "Rebuild Post Links", "Maintenance", fn report -> queue_task(project, "rebuild_post_links", "Rebuild Post Links", "Maintenance", fn report ->
report.(0.0, "Rebuilding link graph") :ok = Posts.rebuild_post_links(project.id, on_progress: report)
:ok = Posts.rebuild_post_links(project.id)
report.(1.0, "Post links rebuilt") report.(1.0, "Post links rebuilt")
%{project_id: project.id} %{project_id: project.id}
end) end)
@@ -160,8 +156,7 @@ defmodule BDS.Desktop.ShellCommands do
defp dispatch("regenerate_missing_thumbnails", project, _params) do defp dispatch("regenerate_missing_thumbnails", project, _params) do
queue_task(project, "regenerate_missing_thumbnails", "Regenerate Missing Thumbnails", "Maintenance", fn report -> queue_task(project, "regenerate_missing_thumbnails", "Regenerate Missing Thumbnails", "Maintenance", fn report ->
report.(0.0, "Checking missing thumbnails") result = BDS.Media.regenerate_missing_thumbnails(project.id, on_progress: report)
result = BDS.Media.regenerate_missing_thumbnails(project.id)
report.(1.0, "Missing thumbnails regenerated") report.(1.0, "Missing thumbnails regenerated")
Map.put(result, :project_id, project.id) Map.put(result, :project_id, project.id)
end) end)
@@ -197,8 +192,7 @@ defmodule BDS.Desktop.ShellCommands do
defp dispatch("generate_sitemap", project, _params) do defp dispatch("generate_sitemap", project, _params) do
queue_task(project, "generate_sitemap", "Generate Sitemap", "Generation", fn report -> queue_task(project, "generate_sitemap", "Generate Sitemap", "Generation", fn report ->
report.(0.2, "Generating site output") {:ok, generation} = Generation.generate_site(project.id, @site_sections, on_progress: report)
{:ok, generation} = Generation.generate_site(project.id, @site_sections)
report.(1.0, "Generated site output") report.(1.0, "Generated site output")
%{project_id: project.id, sections: generation.sections, generated_count: length(generation.generated_files)} %{project_id: project.id, sections: generation.sections, generated_count: length(generation.generated_files)}
end) end)
@@ -206,8 +200,7 @@ defmodule BDS.Desktop.ShellCommands do
defp dispatch("validate_site", project, _params) do defp dispatch("validate_site", project, _params) do
queue_task(project, "validate_site", "Validate Site", "Validation", fn report -> queue_task(project, "validate_site", "Validate Site", "Validation", fn report ->
report.(0.2, "Validating generated site output") {:ok, validation} = Generation.validate_site(project.id, @site_sections, on_progress: report)
{:ok, validation} = Generation.validate_site(project.id, @site_sections)
report.(1.0, "Site validation complete") report.(1.0, "Site validation complete")
site_validation_result(project.id, validation) site_validation_result(project.id, validation)
end) end)
@@ -270,8 +263,7 @@ defmodule BDS.Desktop.ShellCommands do
defp dispatch("validate_translations", project, _params) do defp dispatch("validate_translations", project, _params) do
queue_task(project, "validate_translations", "Validate Translations", "Validation", fn report -> queue_task(project, "validate_translations", "Validate Translations", "Validation", fn report ->
report.(0.2, "Checking published translations") {:ok, translation_report} = Posts.validate_translations(project.id, on_progress: report)
{:ok, translation_report} = Posts.validate_translations(project.id)
report.(1.0, "Translation validation complete") report.(1.0, "Translation validation complete")
translation_validation_result(project.id, translation_report) translation_validation_result(project.id, translation_report)
end) end)
@@ -279,8 +271,7 @@ defmodule BDS.Desktop.ShellCommands do
defp dispatch("find_duplicates", project, _params) do defp dispatch("find_duplicates", project, _params) do
queue_task(project, "find_duplicates", "Find Duplicate Posts", "Embeddings", fn report -> queue_task(project, "find_duplicates", "Find Duplicate Posts", "Embeddings", fn report ->
report.(0.2, "Checking for duplicate posts") {:ok, pairs} = Embeddings.find_duplicates(project.id, on_progress: report)
{:ok, pairs} = Embeddings.find_duplicates(project.id)
report.(1.0, "Duplicate search complete") report.(1.0, "Duplicate search complete")
duplicate_search_result(project.id, pairs) duplicate_search_result(project.id, pairs)
end) end)
@@ -376,8 +367,7 @@ defmodule BDS.Desktop.ShellCommands do
%{ %{
name: "Rebuild Post Links", name: "Rebuild Post Links",
work: fn report -> work: fn report ->
report.(0.0, "Rebuilding link graph") :ok = Posts.rebuild_post_links(project.id, on_progress: report)
:ok = Posts.rebuild_post_links(project.id)
report.(1.0, "Post links rebuilt") report.(1.0, "Post links rebuilt")
%{project_id: project.id} %{project_id: project.id}
end end
@@ -385,8 +375,7 @@ defmodule BDS.Desktop.ShellCommands do
%{ %{
name: "Regenerate Missing Thumbnails", name: "Regenerate Missing Thumbnails",
work: fn report -> work: fn report ->
report.(0.0, "Checking missing thumbnails") result = BDS.Media.regenerate_missing_thumbnails(project.id, on_progress: report)
result = BDS.Media.regenerate_missing_thumbnails(project.id)
report.(1.0, "Missing thumbnails regenerated") report.(1.0, "Missing thumbnails regenerated")
Map.put(result, :project_id, project.id) Map.put(result, :project_id, project.id)
end end
@@ -394,8 +383,7 @@ defmodule BDS.Desktop.ShellCommands do
%{ %{
name: "Rebuild Embedding Index", name: "Rebuild Embedding Index",
work: fn report -> work: fn report ->
report.(0.0, "Rebuilding semantic index") {:ok, rebuilt_post_ids} = Embeddings.rebuild_project(project.id, on_progress: report)
{:ok, rebuilt_post_ids} = Embeddings.rebuild_project(project.id)
report.(1.0, "Embedding index rebuilt") report.(1.0, "Embedding index rebuilt")
%{project_id: project.id, rebuilt_post_ids: rebuilt_post_ids, rebuilt_count: length(rebuilt_post_ids)} %{project_id: project.id, rebuilt_post_ids: rebuilt_post_ids, rebuilt_count: length(rebuilt_post_ids)}
end end

View File

@@ -80,19 +80,33 @@ defmodule BDS.Embeddings do
end end
end end
def rebuild_project(project_id) when is_binary(project_id) do def rebuild_project(project_id, opts \\ [])
def rebuild_project(project_id, opts) when is_binary(project_id) and is_list(opts) do
if enabled_for_project?(project_id) do if enabled_for_project?(project_id) do
on_progress = progress_callback(opts)
posts = posts =
Repo.all(from post in Post, where: post.project_id == ^project_id, order_by: [asc: post.created_at, asc: post.slug]) Repo.all(from post in Post, where: post.project_id == ^project_id, order_by: [asc: post.created_at, asc: post.slug])
post_ids = Enum.map(posts, & &1.id) post_ids = Enum.map(posts, & &1.id)
total_posts = length(posts)
:ok = report_rebuild_started(on_progress, total_posts, "embedding entries")
Repo.delete_all( Repo.delete_all(
from key in Key, from key in Key,
where: key.project_id == ^project_id and key.post_id not in ^post_ids where: key.project_id == ^project_id and key.post_id not in ^post_ids
) )
Enum.each(posts, &sync_post_if_enabled(&1, refresh_index: false)) posts
|> Enum.with_index(1)
|> Enum.each(fn {post, index} ->
sync_post_if_enabled(post, refresh_index: false)
:ok = report_rebuild_progress(on_progress, index, total_posts, "embedding entries")
end)
:ok = report_rebuild_phase(on_progress, 0.99, "Persisting embedding snapshot")
:ok = rebuild_snapshot(project_id) :ok = rebuild_snapshot(project_id)
{:ok, post_ids} {:ok, post_ids}
else else
@@ -293,12 +307,13 @@ defmodule BDS.Embeddings do
end end
end end
def find_duplicates(project_id) when is_binary(project_id) do def find_duplicates(project_id, opts \\ []) when is_binary(project_id) do
if enabled_for_project?(project_id) do if enabled_for_project?(project_id) do
on_progress = progress_callback(opts)
dismissed = dismissed_pair_keys(project_id) dismissed = dismissed_pair_keys(project_id)
duplicates = duplicates =
case Index.duplicate_pairs(project_id, @duplicate_threshold) do case Index.duplicate_pairs(project_id, @duplicate_threshold, on_progress: on_progress) do
{:ok, pairs} -> {:ok, pairs} ->
pairs pairs
|> Enum.reject(fn pair -> pair_key(pair.post_id_a, pair.post_id_b) in dismissed end) |> Enum.reject(fn pair -> pair_key(pair.post_id_a, pair.post_id_b) in dismissed end)
@@ -306,9 +321,16 @@ defmodule BDS.Embeddings do
{:error, :missing} -> {:error, :missing} ->
keys = Repo.all(from key in Key, where: key.project_id == ^project_id, order_by: [asc: key.post_id]) keys = Repo.all(from key in Key, where: key.project_id == ^project_id, order_by: [asc: key.post_id])
total_keys = length(keys)
for left <- keys, :ok = report_rebuild_started(on_progress, total_keys, "embedding entries")
right <- keys,
keys
|> Enum.with_index(1)
|> Enum.flat_map(fn {left, index} ->
:ok = report_rebuild_progress(on_progress, index, total_keys, "embedding entries")
for right <- keys,
left.post_id < right.post_id, left.post_id < right.post_id,
pair_key(left.post_id, right.post_id) not in dismissed, pair_key(left.post_id, right.post_id) not in dismissed,
similarity = cosine_similarity(decode_vector(left.vector), decode_vector(right.vector)), similarity = cosine_similarity(decode_vector(left.vector), decode_vector(right.vector)),
@@ -319,9 +341,11 @@ defmodule BDS.Embeddings do
score: similarity score: similarity
} }
end end
end)
|> enrich_duplicate_pairs(project_id) |> enrich_duplicate_pairs(project_id)
end end
:ok = report_rebuild_phase(on_progress, 0.99, "Resolving duplicate candidates")
{:ok, duplicates} {:ok, duplicates}
else else
{:ok, []} {:ok, []}
@@ -501,6 +525,40 @@ defmodule BDS.Embeddings do
Index.rebuild(project_id, model_id: model_id(), dimensions: dimensions()) Index.rebuild(project_id, model_id: model_id(), dimensions: dimensions())
end end
defp progress_callback(opts) do
case Keyword.get(opts, :on_progress) do
callback when is_function(callback, 2) -> callback
_other -> nil
end
end
defp report_rebuild_started(nil, _total, _label), do: :ok
defp report_rebuild_started(callback, 0, label) do
callback.(1.0, "No #{label} to rebuild")
:ok
end
defp report_rebuild_started(callback, total, label) do
callback.(0.0, "Rebuilding 0/#{total} #{label}")
:ok
end
defp report_rebuild_progress(nil, _current, _total, _label), do: :ok
defp report_rebuild_progress(_callback, _current, 0, _label), do: :ok
defp report_rebuild_progress(callback, current, total, label) do
callback.(current / total, "Rebuilding #{current}/#{total} #{label}")
:ok
end
defp report_rebuild_phase(nil, _value, _label), do: :ok
defp report_rebuild_phase(callback, value, label) do
callback.(value, label)
:ok
end
defp snapshot_content_hash(project_id, post_id) do defp snapshot_content_hash(project_id, post_id) do
case Index.read(project_id) do case Index.read(project_id) do
{:ok, snapshot} -> get_in(snapshot, ["entries", post_id, "content_hash"]) {:ok, snapshot} -> get_in(snapshot, ["entries", post_id, "content_hash"])
@@ -527,8 +585,8 @@ defmodule BDS.Embeddings do
end end
defp diff_field(name, db_value, file_value) do defp diff_field(name, db_value, file_value) do
db_value = if(is_binary(db_value), do: db_value, else: db_value || "") db_value = normalize_diff_value(db_value)
file_value = if(is_binary(file_value), do: file_value, else: file_value || "") file_value = normalize_diff_value(file_value)
if db_value == file_value do if db_value == file_value do
nil nil
@@ -537,6 +595,10 @@ defmodule BDS.Embeddings do
end end
end end
defp normalize_diff_value(value) when is_binary(value), do: value
defp normalize_diff_value(nil), do: ""
defp normalize_diff_value(value), do: value
defp hash_text(text), do: :crypto.hash(:sha256, text) |> Base.encode16(case: :lower) defp hash_text(text), do: :crypto.hash(:sha256, text) |> Base.encode16(case: :lower)
defp decode_vector(nil), do: [] defp decode_vector(nil), do: []

View File

@@ -74,12 +74,20 @@ defmodule BDS.Embeddings.Index do
end end
end end
def duplicate_pairs(project_id, threshold) when is_binary(project_id) do def duplicate_pairs(project_id, threshold, opts \\ []) when is_binary(project_id) do
with {:ok, snapshot} <- read(project_id) do with {:ok, snapshot} <- read(project_id) do
entries = Map.get(snapshot, "entries", %{})
entry_count = map_size(entries)
on_progress = progress_callback(opts)
:ok = report_scan_started(on_progress, entry_count, "embedding entries")
pairs = pairs =
snapshot entries
|> Map.get("entries", %{}) |> Enum.with_index(1)
|> Enum.flat_map(fn {post_id, entry} -> |> Enum.flat_map(fn {{post_id, entry}, index} ->
:ok = report_scan_progress(on_progress, index, entry_count, "embedding entries")
entry entry
|> Map.get("neighbors", []) |> Map.get("neighbors", [])
|> Enum.filter(&(&1["score"] >= threshold)) |> Enum.filter(&(&1["score"] >= threshold))
@@ -197,4 +205,31 @@ defmodule BDS.Embeddings.Index do
defp sort_pair(post_id_a, post_id_b) when post_id_a <= post_id_b, do: {post_id_a, post_id_b} defp sort_pair(post_id_a, post_id_b) when post_id_a <= post_id_b, do: {post_id_a, post_id_b}
defp sort_pair(post_id_a, post_id_b), do: {post_id_b, post_id_a} defp sort_pair(post_id_a, post_id_b), do: {post_id_b, post_id_a}
defp progress_callback(opts) do
case Keyword.get(opts, :on_progress) do
callback when is_function(callback, 2) -> callback
_other -> nil
end
end
defp report_scan_started(nil, _total, _label), do: :ok
defp report_scan_started(callback, 0, label) do
callback.(1.0, "No #{label} to scan")
:ok
end
defp report_scan_started(callback, total, label) do
callback.(0.0, "Scanning 0/#{total} #{label}")
:ok
end
defp report_scan_progress(nil, _current, _total, _label), do: :ok
defp report_scan_progress(_callback, _current, 0, _label), do: :ok
defp report_scan_progress(callback, current, total, label) do
callback.(current / total, "Scanning #{current}/#{total} #{label}")
:ok
end
end end

View File

@@ -37,13 +37,22 @@ defmodule BDS.Generation do
}} }}
end end
def generate_site(project_id, sections \\ [:core]) def generate_site(project_id, sections \\ [:core], opts \\ [])
when is_binary(project_id) and is_list(sections) do
def generate_site(project_id, sections, opts)
when is_binary(project_id) and is_list(sections) and is_list(opts) do
with {:ok, plan} <- plan_generation(project_id, sections) do with {:ok, plan} <- plan_generation(project_id, sections) do
outputs = build_outputs(plan) outputs = build_outputs(plan)
on_progress = progress_callback(opts)
total_outputs = length(outputs)
Enum.each(outputs, fn {relative_path, content} -> :ok = report_generation_started(on_progress, total_outputs, "generated files")
outputs
|> Enum.with_index(1)
|> Enum.each(fn {{relative_path, content}, index} ->
{:ok, _write} = write_generated_file(project_id, relative_path, content) {:ok, _write} = write_generated_file(project_id, relative_path, content)
:ok = report_generation_progress(on_progress, index, total_outputs, "generated files")
end) end)
{:ok, generated_files} = list_generated_files(project_id) {:ok, generated_files} = list_generated_files(project_id)
@@ -51,13 +60,27 @@ defmodule BDS.Generation do
end end
end end
def validate_site(project_id, sections \\ @core_sections) def validate_site(project_id, sections \\ @core_sections, opts \\ [])
def validate_site(project_id, sections) when is_binary(project_id) and is_list(sections) do def validate_site(project_id, sections, opts) when is_binary(project_id) and is_list(sections) and is_list(opts) do
with {:ok, plan} <- plan_generation(project_id, sections) do with {:ok, plan} <- plan_generation(project_id, sections) do
expected_outputs = build_outputs(plan) expected_outputs = build_outputs(plan)
on_progress = progress_callback(opts)
total_outputs = length(expected_outputs)
:ok = report_generation_started(on_progress, total_outputs, "generated files")
expected_paths = MapSet.new(Enum.map(expected_outputs, &elem(&1, 0))) expected_paths = MapSet.new(Enum.map(expected_outputs, &elem(&1, 0)))
expected_hashes = Map.new(expected_outputs, fn {relative_path, content} -> {relative_path, sha256(content)} end)
expected_hashes =
expected_outputs
|> Enum.with_index(1)
|> Enum.map(fn {{relative_path, content}, index} ->
:ok = report_generation_progress(on_progress, index, total_outputs, "generated files")
{relative_path, sha256(content)}
end)
|> Map.new()
actual_files = disk_generated_files(project_id) actual_files = disk_generated_files(project_id)
actual_paths = MapSet.new(Map.keys(actual_files)) actual_paths = MapSet.new(Map.keys(actual_files))
@@ -94,6 +117,33 @@ defmodule BDS.Generation do
end end
end end
defp progress_callback(opts) do
case Keyword.get(opts, :on_progress) do
callback when is_function(callback, 2) -> callback
_other -> nil
end
end
defp report_generation_started(nil, _total, _label), do: :ok
defp report_generation_started(callback, 0, label) do
callback.(1.0, "No #{label} to process")
:ok
end
defp report_generation_started(callback, total, label) do
callback.(0.0, "Processing 0/#{total} #{label}")
:ok
end
defp report_generation_progress(nil, _current, _total, _label), do: :ok
defp report_generation_progress(_callback, _current, 0, _label), do: :ok
defp report_generation_progress(callback, current, total, label) do
callback.(current / total, "Processing #{current}/#{total} #{label}")
:ok
end
def apply_validation(project_id, sections) when is_binary(project_id) and is_list(sections) do def apply_validation(project_id, sections) when is_binary(project_id) and is_list(sections) do
with {:ok, plan} <- plan_generation(project_id, sections) do with {:ok, plan} <- plan_generation(project_id, sections) do
expected_outputs = build_outputs(plan) expected_outputs = build_outputs(plan)

View File

@@ -83,7 +83,7 @@ defmodule BDS.Maintenance do
:media -> BDS.Media.rebuild_media_from_files(project_id, opts) :media -> BDS.Media.rebuild_media_from_files(project_id, opts)
:script -> BDS.Scripts.rebuild_scripts_from_files(project_id, opts) :script -> BDS.Scripts.rebuild_scripts_from_files(project_id, opts)
:template -> BDS.Templates.rebuild_templates_from_files(project_id, opts) :template -> BDS.Templates.rebuild_templates_from_files(project_id, opts)
:embedding -> Embeddings.rebuild_project(project_id) :embedding -> Embeddings.rebuild_project(project_id, opts)
:unsupported -> {:error, :unsupported_entity_type} :unsupported -> {:error, :unsupported_entity_type}
end end
end end
@@ -686,18 +686,13 @@ defmodule BDS.Maintenance do
:file_to_db -> :file_to_db ->
post_ids = Enum.map(items, &metadata_diff_item_entity_id/1) post_ids = Enum.map(items, &metadata_diff_item_entity_id/1)
case Embeddings.repair_posts(project_id, post_ids) do {:ok, repaired_post_ids} = Embeddings.repair_posts(project_id, post_ids)
{:ok, repaired_post_ids} ->
repaired_post_ids = MapSet.new(repaired_post_ids) repaired_post_ids = MapSet.new(repaired_post_ids)
build_batch_repair_result(items, total, on_progress, fn item -> build_batch_repair_result(items, total, on_progress, fn item ->
MapSet.member?(repaired_post_ids, metadata_diff_item_entity_id(item)) MapSet.member?(repaired_post_ids, metadata_diff_item_entity_id(item))
end) end)
_other ->
build_batch_repair_result(items, total, on_progress, fn _item -> false end)
end
:db_to_file -> :db_to_file ->
repaired? = Embeddings.refresh_snapshot(project_id) == :ok repaired? = Embeddings.refresh_snapshot(project_id) == :ok
build_batch_repair_result(items, total, on_progress, fn _item -> repaired? end) build_batch_repair_result(items, total, on_progress, fn _item -> repaired? end)

View File

@@ -467,9 +467,11 @@ defmodule BDS.Media do
end end
end end
def regenerate_missing_thumbnails(project_id) do def regenerate_missing_thumbnails(project_id, opts \\ []) do
project = Projects.get_project!(project_id) project = Projects.get_project!(project_id)
on_progress = progress_callback(opts)
media_items =
Repo.all( Repo.all(
from(media in Media, from(media in Media,
where: media.project_id == ^project_id, where: media.project_id == ^project_id,
@@ -480,13 +482,20 @@ defmodule BDS.Media do
String.starts_with?(media.mime_type || "", "image/") and String.starts_with?(media.mime_type || "", "image/") and
not String.contains?(media.mime_type || "", "svg") not String.contains?(media.mime_type || "", "svg")
end) end)
|> Enum.reduce(%{processed: 0, generated: 0, failed: 0}, fn media, acc ->
total_media = length(media_items)
:ok = report_rebuild_started(on_progress, total_media, "image assets")
media_items
|> Enum.with_index(1)
|> Enum.reduce(%{processed: 0, generated: 0, failed: 0}, fn {media, index}, acc ->
missing_paths = missing_paths =
media media
|> thumbnail_paths() |> thumbnail_paths()
|> Enum.map(fn {_size, relative_path} -> Path.join(Projects.project_data_dir(project), relative_path) end) |> Enum.map(fn {_size, relative_path} -> Path.join(Projects.project_data_dir(project), relative_path) end)
|> Enum.reject(&File.exists?/1) |> Enum.reject(&File.exists?/1)
next_acc =
if missing_paths == [] do if missing_paths == [] do
%{acc | processed: acc.processed + 1} %{acc | processed: acc.processed + 1}
else else
@@ -503,6 +512,9 @@ defmodule BDS.Media do
%{acc | processed: acc.processed + 1, failed: acc.failed + 1} %{acc | processed: acc.processed + 1, failed: acc.failed + 1}
end end
end end
:ok = report_rebuild_progress(on_progress, index, total_media, "image assets")
next_acc
end) end)
end end
@@ -553,7 +565,10 @@ defmodule BDS.Media do
if Keyword.get(opts, :reindex_search, true) do if Keyword.get(opts, :reindex_search, true) do
:ok = report_rebuild_phase(on_progress, 0.99, "Refreshing media search index") :ok = report_rebuild_phase(on_progress, 0.99, "Refreshing media search index")
:ok = Search.reindex_media(project.id) :ok =
Search.reindex_media(project.id,
on_progress: scaled_progress_reporter(on_progress, 0.99, 1.0)
)
end end
{:ok, media_items} {:ok, media_items}
@@ -886,6 +901,15 @@ defmodule BDS.Media do
end end
end end
defp scaled_progress_reporter(nil, _start_value, _end_value), do: nil
defp scaled_progress_reporter(report, start_value, end_value) when is_function(report, 2) do
fn value, message ->
scaled_value = start_value + (end_value - start_value) * value
report.(scaled_value, message)
end
end
defp report_rebuild_started(nil, _total, _label), do: :ok defp report_rebuild_started(nil, _total, _label), do: :ok
defp report_rebuild_started(callback, 0, label) do defp report_rebuild_started(callback, 0, label) do

View File

@@ -183,12 +183,18 @@ defmodule BDS.Posts do
if Keyword.get(opts, :reindex_search, true) do if Keyword.get(opts, :reindex_search, true) do
:ok = report_rebuild_phase(on_progress, 0.97, "Refreshing post search index") :ok = report_rebuild_phase(on_progress, 0.97, "Refreshing post search index")
:ok = Search.reindex_posts(project_id) :ok =
Search.reindex_posts(project_id,
on_progress: scaled_progress_reporter(on_progress, 0.97, 0.99)
)
end end
if Keyword.get(opts, :rebuild_embeddings, true) do if Keyword.get(opts, :rebuild_embeddings, true) do
:ok = report_rebuild_phase(on_progress, 0.99, "Refreshing post embeddings") :ok = report_rebuild_phase(on_progress, 0.99, "Refreshing post embeddings")
{:ok, _rebuilt_post_ids} = Embeddings.rebuild_project(project_id) {:ok, _rebuilt_post_ids} =
Embeddings.rebuild_project(project_id,
on_progress: scaled_progress_reporter(on_progress, 0.99, 1.0)
)
end end
{:ok, posts} {:ok, posts}
@@ -487,8 +493,9 @@ defmodule BDS.Posts do
|> Enum.sort_by(fn %{year: year, month: month} -> {-year, -month} end) |> Enum.sort_by(fn %{year: year, month: month} -> {-year, -month} end)
end end
def rebuild_post_links(project_id) do def rebuild_post_links(project_id, opts \\ []) do
post_ids = Repo.all(from(post in Post, where: post.project_id == ^project_id, select: post.id)) post_ids = Repo.all(from(post in Post, where: post.project_id == ^project_id, select: post.id))
on_progress = progress_callback(opts)
Repo.delete_all( Repo.delete_all(
from(link in Link, from(link in Link,
@@ -496,8 +503,16 @@ defmodule BDS.Posts do
) )
) )
Repo.all(from(post in Post, where: post.project_id == ^project_id, order_by: [asc: post.created_at])) posts = Repo.all(from(post in Post, where: post.project_id == ^project_id, order_by: [asc: post.created_at]))
|> Enum.each(&PostLinks.sync_post_links/1) total_posts = length(posts)
:ok = report_rebuild_started(on_progress, total_posts, "post links")
posts
|> Enum.with_index(1)
|> Enum.each(fn {post, index} ->
PostLinks.sync_post_links(post)
:ok = report_rebuild_progress(on_progress, index, total_posts, "post links")
end)
:ok :ok
end end
@@ -564,8 +579,9 @@ defmodule BDS.Posts do
end end
end end
def validate_translations(project_id) do def validate_translations(project_id, opts \\ []) do
{:ok, metadata} = Metadata.get_project_metadata(project_id) {:ok, metadata} = Metadata.get_project_metadata(project_id)
on_progress = progress_callback(opts)
posts = posts =
Repo.all( Repo.all(
@@ -574,6 +590,9 @@ defmodule BDS.Posts do
order_by: [asc: post.created_at, asc: post.slug] order_by: [asc: post.created_at, asc: post.slug]
) )
total_posts = length(posts)
:ok = report_rebuild_started(on_progress, total_posts, "published posts")
translation_languages = translation_languages =
Repo.all( Repo.all(
from translation in Translation, from translation in Translation,
@@ -595,9 +614,12 @@ defmodule BDS.Posts do
missing = missing =
posts posts
|> Enum.flat_map(fn post -> |> Enum.with_index(1)
|> Enum.flat_map(fn {post, index} ->
available = Map.get(translation_languages, post.id, []) available = Map.get(translation_languages, post.id, [])
:ok = report_rebuild_progress(on_progress, index, total_posts, "published posts")
cond do cond do
post.do_not_translate -> post.do_not_translate ->
[] []
@@ -1336,6 +1358,15 @@ defmodule BDS.Posts do
end end
end end
defp scaled_progress_reporter(nil, _start_value, _end_value), do: nil
defp scaled_progress_reporter(report, start_value, end_value) when is_function(report, 2) do
fn value, message ->
scaled_value = start_value + (end_value - start_value) * value
report.(scaled_value, message)
end
end
defp report_rebuild_started(nil, _total, _label), do: :ok defp report_rebuild_started(nil, _total, _label), do: :ok
defp report_rebuild_started(callback, 0, label) do defp report_rebuild_started(callback, 0, label) do

View File

@@ -128,26 +128,46 @@ defmodule BDS.Search do
:ok :ok
end end
def reindex_posts(project_id) do def reindex_posts(project_id, opts \\ []) do
Repo.query!( Repo.query!(
"DELETE FROM posts_fts WHERE post_id IN (SELECT id FROM posts WHERE project_id = ?)", "DELETE FROM posts_fts WHERE post_id IN (SELECT id FROM posts WHERE project_id = ?)",
[project_id] [project_id]
) )
Repo.all(from post in Post, where: post.project_id == ^project_id) posts = Repo.all(from post in Post, where: post.project_id == ^project_id)
|> Enum.each(&insert_post_index/1) on_progress = progress_callback(opts)
total_posts = length(posts)
:ok = report_reindex_started(on_progress, total_posts, "posts")
posts
|> Enum.with_index(1)
|> Enum.each(fn {post, index} ->
insert_post_index(post)
:ok = report_reindex_progress(on_progress, index, total_posts, "posts")
end)
:ok :ok
end end
def reindex_media(project_id) do def reindex_media(project_id, opts \\ []) do
Repo.query!( Repo.query!(
"DELETE FROM media_fts WHERE media_id IN (SELECT id FROM media WHERE project_id = ?)", "DELETE FROM media_fts WHERE media_id IN (SELECT id FROM media WHERE project_id = ?)",
[project_id] [project_id]
) )
Repo.all(from media in Media, where: media.project_id == ^project_id) media_items = Repo.all(from media in Media, where: media.project_id == ^project_id)
|> Enum.each(&insert_media_index/1) on_progress = progress_callback(opts)
total_media = length(media_items)
:ok = report_reindex_started(on_progress, total_media, "media items")
media_items
|> Enum.with_index(1)
|> Enum.each(fn {media, index} ->
insert_media_index(media)
:ok = report_reindex_progress(on_progress, index, total_media, "media items")
end)
:ok :ok
end end
@@ -192,6 +212,33 @@ defmodule BDS.Search do
:ok :ok
end end
defp progress_callback(opts) do
case Keyword.get(opts, :on_progress) do
callback when is_function(callback, 2) -> callback
_other -> nil
end
end
defp report_reindex_started(nil, _total, _label), do: :ok
defp report_reindex_started(callback, 0, label) do
callback.(1.0, "No #{label} to reindex")
:ok
end
defp report_reindex_started(callback, total, label) do
callback.(0.0, "Reindexing 0/#{total} #{label}")
:ok
end
defp report_reindex_progress(nil, _current, _total, _label), do: :ok
defp report_reindex_progress(_callback, _current, 0, _label), do: :ok
defp report_reindex_progress(callback, current, total, label) do
callback.(current / total, "Reindexing #{current}/#{total} #{label}")
:ok
end
defp insert_post_index(%Post{} = post) do defp insert_post_index(%Post{} = post) do
{title, excerpt, content, tags, categories} = post_index_fields(post) {title, excerpt, content, tags, categories} = post_index_fields(post)

View File

@@ -3,6 +3,21 @@ defmodule BDS.Desktop.ShellCommandsTest do
alias BDS.Desktop.ShellCommands alias BDS.Desktop.ShellCommands
defmodule SlowEmbeddingBackend do
@behaviour BDS.Embeddings.Backend
@impl true
def model_info do
BDS.Embeddings.Backends.InApp.model_info()
end
@impl true
def embed(text, opts) do
Process.sleep(10)
BDS.Embeddings.Backends.InApp.embed(text, opts)
end
end
setup do setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo) :ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo)
Ecto.Adapters.SQL.Sandbox.mode(BDS.Repo, {:shared, self()}) Ecto.Adapters.SQL.Sandbox.mode(BDS.Repo, {:shared, self()})
@@ -177,6 +192,64 @@ defmodule BDS.Desktop.ShellCommandsTest do
assert is_map(completed.result.payload.summary) assert is_map(completed.result.payload.summary)
end end
test "rebuild_embedding_index exposes live in-task progress while rebuilding posts", %{project: project} do
original = Application.get_env(:bds, :tasks, [])
original_embeddings = Application.get_env(:bds, :embeddings)
Application.put_env(
:bds,
:tasks,
original
|> Keyword.put(:max_concurrent, 1)
|> Keyword.put(:progress_throttle_ms, 0)
)
Application.put_env(:bds, :embeddings, backend: SlowEmbeddingBackend)
on_exit(fn ->
Application.put_env(:bds, :tasks, original)
if is_nil(original_embeddings) do
Application.delete_env(:bds, :embeddings)
else
Application.put_env(:bds, :embeddings, original_embeddings)
end
end)
assert {:ok, _metadata} =
BDS.Metadata.update_project_metadata(project.id, %{semantic_similarity_enabled: true})
Enum.each(1..40, fn index ->
assert {:ok, post} =
BDS.Posts.create_post(%{
project_id: project.id,
title: "Embedding Progress #{index}",
content: "space rocket orbit mission galaxy #{index}",
language: "en"
})
assert {:ok, _published_post} = BDS.Posts.publish_post(post.id)
end)
BDS.Repo.delete_all(BDS.Embeddings.Key)
assert {:ok, result} = ShellCommands.execute("rebuild_embedding_index")
progressed =
wait_for_task(
result.task_id,
&(&1.status == :running and is_number(&1.progress) and &1.progress > 0.0 and &1.progress < 1.0 and
is_binary(&1.message) and String.contains?(&1.message, "/")),
10_000
)
assert progressed.group_name == "Embeddings"
assert String.contains?(progressed.message, "/")
assert wait_for_task(result.task_id, &(&1.status == :completed and &1.progress == 1.0), 10_000).status ==
:completed
end
test "rebuild_database fans out tracked maintenance tasks for the active project" do test "rebuild_database fans out tracked maintenance tasks for the active project" do
assert {:ok, result} = ShellCommands.execute("rebuild_database") assert {:ok, result} = ShellCommands.execute("rebuild_database")