diff --git a/lib/bds/desktop/shell_commands.ex b/lib/bds/desktop/shell_commands.ex index ed22501..8d7aa7c 100644 --- a/lib/bds/desktop/shell_commands.ex +++ b/lib/bds/desktop/shell_commands.ex @@ -81,16 +81,14 @@ defmodule BDS.Desktop.ShellCommands do {:ok, posts_task} = Tasks.submit_task("Reindex Search Text", fn report -> - report.(0.0, "Clearing and rebuilding post search indexes") - :ok = Search.reindex_posts(project.id) + :ok = Search.reindex_posts(project.id, on_progress: report) report.(1.0, "Post search text reindexed") %{project_id: project.id, entity: "posts"} end, attrs) {:ok, _media_task} = Tasks.submit_task("Reindex Media Search Text", fn report -> - report.(0.0, "Clearing and rebuilding media search indexes") - :ok = Search.reindex_media(project.id) + :ok = Search.reindex_media(project.id, on_progress: report) report.(1.0, "Media search text reindexed") %{project_id: project.id, entity: "media"} end, attrs) @@ -110,8 +108,7 @@ defmodule BDS.Desktop.ShellCommands do defp dispatch("rebuild_embedding_index", project, _params) do queue_task(project, "rebuild_embedding_index", "Rebuild Embedding Index", "Embeddings", fn report -> - report.(0.2, "Rebuilding semantic index") - {:ok, rebuilt_post_ids} = Embeddings.rebuild_project(project.id) + {:ok, rebuilt_post_ids} = Embeddings.rebuild_project(project.id, on_progress: report) report.(1.0, "Embedding index rebuilt") %{project_id: project.id, rebuilt_post_ids: rebuilt_post_ids, rebuilt_count: length(rebuilt_post_ids)} end) @@ -151,8 +148,7 @@ defmodule BDS.Desktop.ShellCommands do defp dispatch("rebuild_post_links", project, _params) do queue_task(project, "rebuild_post_links", "Rebuild Post Links", "Maintenance", fn report -> - report.(0.0, "Rebuilding link graph") - :ok = Posts.rebuild_post_links(project.id) + :ok = Posts.rebuild_post_links(project.id, on_progress: report) report.(1.0, "Post links rebuilt") %{project_id: project.id} end) @@ -160,8 +156,7 @@ defmodule BDS.Desktop.ShellCommands do defp dispatch("regenerate_missing_thumbnails", project, _params) do queue_task(project, "regenerate_missing_thumbnails", "Regenerate Missing Thumbnails", "Maintenance", fn report -> - report.(0.0, "Checking missing thumbnails") - result = BDS.Media.regenerate_missing_thumbnails(project.id) + result = BDS.Media.regenerate_missing_thumbnails(project.id, on_progress: report) report.(1.0, "Missing thumbnails regenerated") Map.put(result, :project_id, project.id) end) @@ -197,8 +192,7 @@ defmodule BDS.Desktop.ShellCommands do defp dispatch("generate_sitemap", project, _params) do queue_task(project, "generate_sitemap", "Generate Sitemap", "Generation", fn report -> - report.(0.2, "Generating site output") - {:ok, generation} = Generation.generate_site(project.id, @site_sections) + {:ok, generation} = Generation.generate_site(project.id, @site_sections, on_progress: report) report.(1.0, "Generated site output") %{project_id: project.id, sections: generation.sections, generated_count: length(generation.generated_files)} end) @@ -206,8 +200,7 @@ defmodule BDS.Desktop.ShellCommands do defp dispatch("validate_site", project, _params) do queue_task(project, "validate_site", "Validate Site", "Validation", fn report -> - report.(0.2, "Validating generated site output") - {:ok, validation} = Generation.validate_site(project.id, @site_sections) + {:ok, validation} = Generation.validate_site(project.id, @site_sections, on_progress: report) report.(1.0, "Site validation complete") site_validation_result(project.id, validation) end) @@ -270,8 +263,7 @@ defmodule BDS.Desktop.ShellCommands do defp dispatch("validate_translations", project, _params) do queue_task(project, "validate_translations", "Validate Translations", "Validation", fn report -> - report.(0.2, "Checking published translations") - {:ok, translation_report} = Posts.validate_translations(project.id) + {:ok, translation_report} = Posts.validate_translations(project.id, on_progress: report) report.(1.0, "Translation validation complete") translation_validation_result(project.id, translation_report) end) @@ -279,8 +271,7 @@ defmodule BDS.Desktop.ShellCommands do defp dispatch("find_duplicates", project, _params) do queue_task(project, "find_duplicates", "Find Duplicate Posts", "Embeddings", fn report -> - report.(0.2, "Checking for duplicate posts") - {:ok, pairs} = Embeddings.find_duplicates(project.id) + {:ok, pairs} = Embeddings.find_duplicates(project.id, on_progress: report) report.(1.0, "Duplicate search complete") duplicate_search_result(project.id, pairs) end) @@ -376,8 +367,7 @@ defmodule BDS.Desktop.ShellCommands do %{ name: "Rebuild Post Links", work: fn report -> - report.(0.0, "Rebuilding link graph") - :ok = Posts.rebuild_post_links(project.id) + :ok = Posts.rebuild_post_links(project.id, on_progress: report) report.(1.0, "Post links rebuilt") %{project_id: project.id} end @@ -385,8 +375,7 @@ defmodule BDS.Desktop.ShellCommands do %{ name: "Regenerate Missing Thumbnails", work: fn report -> - report.(0.0, "Checking missing thumbnails") - result = BDS.Media.regenerate_missing_thumbnails(project.id) + result = BDS.Media.regenerate_missing_thumbnails(project.id, on_progress: report) report.(1.0, "Missing thumbnails regenerated") Map.put(result, :project_id, project.id) end @@ -394,8 +383,7 @@ defmodule BDS.Desktop.ShellCommands do %{ name: "Rebuild Embedding Index", work: fn report -> - report.(0.0, "Rebuilding semantic index") - {:ok, rebuilt_post_ids} = Embeddings.rebuild_project(project.id) + {:ok, rebuilt_post_ids} = Embeddings.rebuild_project(project.id, on_progress: report) report.(1.0, "Embedding index rebuilt") %{project_id: project.id, rebuilt_post_ids: rebuilt_post_ids, rebuilt_count: length(rebuilt_post_ids)} end diff --git a/lib/bds/embeddings.ex b/lib/bds/embeddings.ex index ed82d79..fd663c6 100644 --- a/lib/bds/embeddings.ex +++ b/lib/bds/embeddings.ex @@ -80,19 +80,33 @@ defmodule BDS.Embeddings do end end - def rebuild_project(project_id) when is_binary(project_id) do + def rebuild_project(project_id, opts \\ []) + + def rebuild_project(project_id, opts) when is_binary(project_id) and is_list(opts) do if enabled_for_project?(project_id) do + on_progress = progress_callback(opts) + posts = Repo.all(from post in Post, where: post.project_id == ^project_id, order_by: [asc: post.created_at, asc: post.slug]) post_ids = Enum.map(posts, & &1.id) + total_posts = length(posts) + + :ok = report_rebuild_started(on_progress, total_posts, "embedding entries") Repo.delete_all( from key in Key, where: key.project_id == ^project_id and key.post_id not in ^post_ids ) - Enum.each(posts, &sync_post_if_enabled(&1, refresh_index: false)) + posts + |> Enum.with_index(1) + |> Enum.each(fn {post, index} -> + sync_post_if_enabled(post, refresh_index: false) + :ok = report_rebuild_progress(on_progress, index, total_posts, "embedding entries") + end) + + :ok = report_rebuild_phase(on_progress, 0.99, "Persisting embedding snapshot") :ok = rebuild_snapshot(project_id) {:ok, post_ids} else @@ -293,12 +307,13 @@ defmodule BDS.Embeddings do end end - def find_duplicates(project_id) when is_binary(project_id) do + def find_duplicates(project_id, opts \\ []) when is_binary(project_id) do if enabled_for_project?(project_id) do + on_progress = progress_callback(opts) dismissed = dismissed_pair_keys(project_id) duplicates = - case Index.duplicate_pairs(project_id, @duplicate_threshold) do + case Index.duplicate_pairs(project_id, @duplicate_threshold, on_progress: on_progress) do {:ok, pairs} -> pairs |> Enum.reject(fn pair -> pair_key(pair.post_id_a, pair.post_id_b) in dismissed end) @@ -306,22 +321,31 @@ defmodule BDS.Embeddings do {:error, :missing} -> keys = Repo.all(from key in Key, where: key.project_id == ^project_id, order_by: [asc: key.post_id]) + total_keys = length(keys) - for left <- keys, - right <- keys, - left.post_id < right.post_id, - pair_key(left.post_id, right.post_id) not in dismissed, - similarity = cosine_similarity(decode_vector(left.vector), decode_vector(right.vector)), - similarity >= @duplicate_threshold do - %{ - post_id_a: left.post_id, - post_id_b: right.post_id, - score: similarity - } - end + :ok = report_rebuild_started(on_progress, total_keys, "embedding entries") + + keys + |> Enum.with_index(1) + |> Enum.flat_map(fn {left, index} -> + :ok = report_rebuild_progress(on_progress, index, total_keys, "embedding entries") + + for right <- keys, + left.post_id < right.post_id, + pair_key(left.post_id, right.post_id) not in dismissed, + similarity = cosine_similarity(decode_vector(left.vector), decode_vector(right.vector)), + similarity >= @duplicate_threshold do + %{ + post_id_a: left.post_id, + post_id_b: right.post_id, + score: similarity + } + end + end) |> enrich_duplicate_pairs(project_id) end + :ok = report_rebuild_phase(on_progress, 0.99, "Resolving duplicate candidates") {:ok, duplicates} else {:ok, []} @@ -501,6 +525,40 @@ defmodule BDS.Embeddings do Index.rebuild(project_id, model_id: model_id(), dimensions: dimensions()) end + defp progress_callback(opts) do + case Keyword.get(opts, :on_progress) do + callback when is_function(callback, 2) -> callback + _other -> nil + end + end + + defp report_rebuild_started(nil, _total, _label), do: :ok + + defp report_rebuild_started(callback, 0, label) do + callback.(1.0, "No #{label} to rebuild") + :ok + end + + defp report_rebuild_started(callback, total, label) do + callback.(0.0, "Rebuilding 0/#{total} #{label}") + :ok + end + + defp report_rebuild_progress(nil, _current, _total, _label), do: :ok + defp report_rebuild_progress(_callback, _current, 0, _label), do: :ok + + defp report_rebuild_progress(callback, current, total, label) do + callback.(current / total, "Rebuilding #{current}/#{total} #{label}") + :ok + end + + defp report_rebuild_phase(nil, _value, _label), do: :ok + + defp report_rebuild_phase(callback, value, label) do + callback.(value, label) + :ok + end + defp snapshot_content_hash(project_id, post_id) do case Index.read(project_id) do {:ok, snapshot} -> get_in(snapshot, ["entries", post_id, "content_hash"]) @@ -527,8 +585,8 @@ defmodule BDS.Embeddings do end defp diff_field(name, db_value, file_value) do - db_value = if(is_binary(db_value), do: db_value, else: db_value || "") - file_value = if(is_binary(file_value), do: file_value, else: file_value || "") + db_value = normalize_diff_value(db_value) + file_value = normalize_diff_value(file_value) if db_value == file_value do nil @@ -537,6 +595,10 @@ defmodule BDS.Embeddings do end end + defp normalize_diff_value(value) when is_binary(value), do: value + defp normalize_diff_value(nil), do: "" + defp normalize_diff_value(value), do: value + defp hash_text(text), do: :crypto.hash(:sha256, text) |> Base.encode16(case: :lower) defp decode_vector(nil), do: [] diff --git a/lib/bds/embeddings/index.ex b/lib/bds/embeddings/index.ex index 2be3a96..84922d3 100644 --- a/lib/bds/embeddings/index.ex +++ b/lib/bds/embeddings/index.ex @@ -74,12 +74,20 @@ defmodule BDS.Embeddings.Index do end end - def duplicate_pairs(project_id, threshold) when is_binary(project_id) do + def duplicate_pairs(project_id, threshold, opts \\ []) when is_binary(project_id) do with {:ok, snapshot} <- read(project_id) do + entries = Map.get(snapshot, "entries", %{}) + entry_count = map_size(entries) + on_progress = progress_callback(opts) + + :ok = report_scan_started(on_progress, entry_count, "embedding entries") + pairs = - snapshot - |> Map.get("entries", %{}) - |> Enum.flat_map(fn {post_id, entry} -> + entries + |> Enum.with_index(1) + |> Enum.flat_map(fn {{post_id, entry}, index} -> + :ok = report_scan_progress(on_progress, index, entry_count, "embedding entries") + entry |> Map.get("neighbors", []) |> Enum.filter(&(&1["score"] >= threshold)) @@ -197,4 +205,31 @@ defmodule BDS.Embeddings.Index do defp sort_pair(post_id_a, post_id_b) when post_id_a <= post_id_b, do: {post_id_a, post_id_b} defp sort_pair(post_id_a, post_id_b), do: {post_id_b, post_id_a} + + defp progress_callback(opts) do + case Keyword.get(opts, :on_progress) do + callback when is_function(callback, 2) -> callback + _other -> nil + end + end + + defp report_scan_started(nil, _total, _label), do: :ok + + defp report_scan_started(callback, 0, label) do + callback.(1.0, "No #{label} to scan") + :ok + end + + defp report_scan_started(callback, total, label) do + callback.(0.0, "Scanning 0/#{total} #{label}") + :ok + end + + defp report_scan_progress(nil, _current, _total, _label), do: :ok + defp report_scan_progress(_callback, _current, 0, _label), do: :ok + + defp report_scan_progress(callback, current, total, label) do + callback.(current / total, "Scanning #{current}/#{total} #{label}") + :ok + end end diff --git a/lib/bds/generation.ex b/lib/bds/generation.ex index ab84999..1d74215 100644 --- a/lib/bds/generation.ex +++ b/lib/bds/generation.ex @@ -37,13 +37,22 @@ defmodule BDS.Generation do }} end - def generate_site(project_id, sections \\ [:core]) - when is_binary(project_id) and is_list(sections) do + def generate_site(project_id, sections \\ [:core], opts \\ []) + + def generate_site(project_id, sections, opts) + when is_binary(project_id) and is_list(sections) and is_list(opts) do with {:ok, plan} <- plan_generation(project_id, sections) do outputs = build_outputs(plan) + on_progress = progress_callback(opts) + total_outputs = length(outputs) - Enum.each(outputs, fn {relative_path, content} -> + :ok = report_generation_started(on_progress, total_outputs, "generated files") + + outputs + |> Enum.with_index(1) + |> Enum.each(fn {{relative_path, content}, index} -> {:ok, _write} = write_generated_file(project_id, relative_path, content) + :ok = report_generation_progress(on_progress, index, total_outputs, "generated files") end) {:ok, generated_files} = list_generated_files(project_id) @@ -51,13 +60,27 @@ defmodule BDS.Generation do end end - def validate_site(project_id, sections \\ @core_sections) + def validate_site(project_id, sections \\ @core_sections, opts \\ []) - def validate_site(project_id, sections) when is_binary(project_id) and is_list(sections) do + def validate_site(project_id, sections, opts) when is_binary(project_id) and is_list(sections) and is_list(opts) do with {:ok, plan} <- plan_generation(project_id, sections) do expected_outputs = build_outputs(plan) + on_progress = progress_callback(opts) + total_outputs = length(expected_outputs) + + :ok = report_generation_started(on_progress, total_outputs, "generated files") + expected_paths = MapSet.new(Enum.map(expected_outputs, &elem(&1, 0))) - expected_hashes = Map.new(expected_outputs, fn {relative_path, content} -> {relative_path, sha256(content)} end) + + expected_hashes = + expected_outputs + |> Enum.with_index(1) + |> Enum.map(fn {{relative_path, content}, index} -> + :ok = report_generation_progress(on_progress, index, total_outputs, "generated files") + {relative_path, sha256(content)} + end) + |> Map.new() + actual_files = disk_generated_files(project_id) actual_paths = MapSet.new(Map.keys(actual_files)) @@ -94,6 +117,33 @@ defmodule BDS.Generation do end end + defp progress_callback(opts) do + case Keyword.get(opts, :on_progress) do + callback when is_function(callback, 2) -> callback + _other -> nil + end + end + + defp report_generation_started(nil, _total, _label), do: :ok + + defp report_generation_started(callback, 0, label) do + callback.(1.0, "No #{label} to process") + :ok + end + + defp report_generation_started(callback, total, label) do + callback.(0.0, "Processing 0/#{total} #{label}") + :ok + end + + defp report_generation_progress(nil, _current, _total, _label), do: :ok + defp report_generation_progress(_callback, _current, 0, _label), do: :ok + + defp report_generation_progress(callback, current, total, label) do + callback.(current / total, "Processing #{current}/#{total} #{label}") + :ok + end + def apply_validation(project_id, sections) when is_binary(project_id) and is_list(sections) do with {:ok, plan} <- plan_generation(project_id, sections) do expected_outputs = build_outputs(plan) diff --git a/lib/bds/maintenance.ex b/lib/bds/maintenance.ex index 0839846..1ea64a8 100644 --- a/lib/bds/maintenance.ex +++ b/lib/bds/maintenance.ex @@ -83,7 +83,7 @@ defmodule BDS.Maintenance do :media -> BDS.Media.rebuild_media_from_files(project_id, opts) :script -> BDS.Scripts.rebuild_scripts_from_files(project_id, opts) :template -> BDS.Templates.rebuild_templates_from_files(project_id, opts) - :embedding -> Embeddings.rebuild_project(project_id) + :embedding -> Embeddings.rebuild_project(project_id, opts) :unsupported -> {:error, :unsupported_entity_type} end end @@ -686,17 +686,12 @@ defmodule BDS.Maintenance do :file_to_db -> post_ids = Enum.map(items, &metadata_diff_item_entity_id/1) - case Embeddings.repair_posts(project_id, post_ids) do - {:ok, repaired_post_ids} -> - repaired_post_ids = MapSet.new(repaired_post_ids) + {:ok, repaired_post_ids} = Embeddings.repair_posts(project_id, post_ids) + repaired_post_ids = MapSet.new(repaired_post_ids) - build_batch_repair_result(items, total, on_progress, fn item -> - MapSet.member?(repaired_post_ids, metadata_diff_item_entity_id(item)) - end) - - _other -> - build_batch_repair_result(items, total, on_progress, fn _item -> false end) - end + build_batch_repair_result(items, total, on_progress, fn item -> + MapSet.member?(repaired_post_ids, metadata_diff_item_entity_id(item)) + end) :db_to_file -> repaired? = Embeddings.refresh_snapshot(project_id) == :ok diff --git a/lib/bds/media.ex b/lib/bds/media.ex index 706c973..5b40a3f 100644 --- a/lib/bds/media.ex +++ b/lib/bds/media.ex @@ -467,42 +467,54 @@ defmodule BDS.Media do end end - def regenerate_missing_thumbnails(project_id) do + def regenerate_missing_thumbnails(project_id, opts \\ []) do project = Projects.get_project!(project_id) + on_progress = progress_callback(opts) - Repo.all( + media_items = + Repo.all( from(media in Media, where: media.project_id == ^project_id, order_by: [asc: media.created_at] ) - ) - |> Enum.filter(fn media -> - String.starts_with?(media.mime_type || "", "image/") and - not String.contains?(media.mime_type || "", "svg") - end) - |> Enum.reduce(%{processed: 0, generated: 0, failed: 0}, fn media, acc -> + ) + |> Enum.filter(fn media -> + String.starts_with?(media.mime_type || "", "image/") and + not String.contains?(media.mime_type || "", "svg") + end) + + total_media = length(media_items) + :ok = report_rebuild_started(on_progress, total_media, "image assets") + + media_items + |> Enum.with_index(1) + |> Enum.reduce(%{processed: 0, generated: 0, failed: 0}, fn {media, index}, acc -> missing_paths = media |> thumbnail_paths() |> Enum.map(fn {_size, relative_path} -> Path.join(Projects.project_data_dir(project), relative_path) end) |> Enum.reject(&File.exists?/1) - if missing_paths == [] do - %{acc | processed: acc.processed + 1} - else - try do - :ok = ensure_thumbnails(project, media) + next_acc = + if missing_paths == [] do + %{acc | processed: acc.processed + 1} + else + try do + :ok = ensure_thumbnails(project, media) - %{ - processed: acc.processed + 1, - generated: acc.generated + length(missing_paths), - failed: acc.failed - } - rescue - _error -> - %{acc | processed: acc.processed + 1, failed: acc.failed + 1} + %{ + processed: acc.processed + 1, + generated: acc.generated + length(missing_paths), + failed: acc.failed + } + rescue + _error -> + %{acc | processed: acc.processed + 1, failed: acc.failed + 1} + end end - end + + :ok = report_rebuild_progress(on_progress, index, total_media, "image assets") + next_acc end) end @@ -553,7 +565,10 @@ defmodule BDS.Media do if Keyword.get(opts, :reindex_search, true) do :ok = report_rebuild_phase(on_progress, 0.99, "Refreshing media search index") - :ok = Search.reindex_media(project.id) + :ok = + Search.reindex_media(project.id, + on_progress: scaled_progress_reporter(on_progress, 0.99, 1.0) + ) end {:ok, media_items} @@ -886,6 +901,15 @@ defmodule BDS.Media do end end + defp scaled_progress_reporter(nil, _start_value, _end_value), do: nil + + defp scaled_progress_reporter(report, start_value, end_value) when is_function(report, 2) do + fn value, message -> + scaled_value = start_value + (end_value - start_value) * value + report.(scaled_value, message) + end + end + defp report_rebuild_started(nil, _total, _label), do: :ok defp report_rebuild_started(callback, 0, label) do diff --git a/lib/bds/posts.ex b/lib/bds/posts.ex index ae5370f..74ba454 100644 --- a/lib/bds/posts.ex +++ b/lib/bds/posts.ex @@ -183,12 +183,18 @@ defmodule BDS.Posts do if Keyword.get(opts, :reindex_search, true) do :ok = report_rebuild_phase(on_progress, 0.97, "Refreshing post search index") - :ok = Search.reindex_posts(project_id) + :ok = + Search.reindex_posts(project_id, + on_progress: scaled_progress_reporter(on_progress, 0.97, 0.99) + ) end if Keyword.get(opts, :rebuild_embeddings, true) do :ok = report_rebuild_phase(on_progress, 0.99, "Refreshing post embeddings") - {:ok, _rebuilt_post_ids} = Embeddings.rebuild_project(project_id) + {:ok, _rebuilt_post_ids} = + Embeddings.rebuild_project(project_id, + on_progress: scaled_progress_reporter(on_progress, 0.99, 1.0) + ) end {:ok, posts} @@ -487,8 +493,9 @@ defmodule BDS.Posts do |> Enum.sort_by(fn %{year: year, month: month} -> {-year, -month} end) end - def rebuild_post_links(project_id) do + def rebuild_post_links(project_id, opts \\ []) do post_ids = Repo.all(from(post in Post, where: post.project_id == ^project_id, select: post.id)) + on_progress = progress_callback(opts) Repo.delete_all( from(link in Link, @@ -496,8 +503,16 @@ defmodule BDS.Posts do ) ) - Repo.all(from(post in Post, where: post.project_id == ^project_id, order_by: [asc: post.created_at])) - |> Enum.each(&PostLinks.sync_post_links/1) + posts = Repo.all(from(post in Post, where: post.project_id == ^project_id, order_by: [asc: post.created_at])) + total_posts = length(posts) + :ok = report_rebuild_started(on_progress, total_posts, "post links") + + posts + |> Enum.with_index(1) + |> Enum.each(fn {post, index} -> + PostLinks.sync_post_links(post) + :ok = report_rebuild_progress(on_progress, index, total_posts, "post links") + end) :ok end @@ -564,8 +579,9 @@ defmodule BDS.Posts do end end - def validate_translations(project_id) do + def validate_translations(project_id, opts \\ []) do {:ok, metadata} = Metadata.get_project_metadata(project_id) + on_progress = progress_callback(opts) posts = Repo.all( @@ -574,6 +590,9 @@ defmodule BDS.Posts do order_by: [asc: post.created_at, asc: post.slug] ) + total_posts = length(posts) + :ok = report_rebuild_started(on_progress, total_posts, "published posts") + translation_languages = Repo.all( from translation in Translation, @@ -595,9 +614,12 @@ defmodule BDS.Posts do missing = posts - |> Enum.flat_map(fn post -> + |> Enum.with_index(1) + |> Enum.flat_map(fn {post, index} -> available = Map.get(translation_languages, post.id, []) + :ok = report_rebuild_progress(on_progress, index, total_posts, "published posts") + cond do post.do_not_translate -> [] @@ -1336,6 +1358,15 @@ defmodule BDS.Posts do end end + defp scaled_progress_reporter(nil, _start_value, _end_value), do: nil + + defp scaled_progress_reporter(report, start_value, end_value) when is_function(report, 2) do + fn value, message -> + scaled_value = start_value + (end_value - start_value) * value + report.(scaled_value, message) + end + end + defp report_rebuild_started(nil, _total, _label), do: :ok defp report_rebuild_started(callback, 0, label) do diff --git a/lib/bds/search.ex b/lib/bds/search.ex index ac86c04..5368ee8 100644 --- a/lib/bds/search.ex +++ b/lib/bds/search.ex @@ -128,26 +128,46 @@ defmodule BDS.Search do :ok end - def reindex_posts(project_id) do + def reindex_posts(project_id, opts \\ []) do Repo.query!( "DELETE FROM posts_fts WHERE post_id IN (SELECT id FROM posts WHERE project_id = ?)", [project_id] ) - Repo.all(from post in Post, where: post.project_id == ^project_id) - |> Enum.each(&insert_post_index/1) + posts = Repo.all(from post in Post, where: post.project_id == ^project_id) + on_progress = progress_callback(opts) + total_posts = length(posts) + + :ok = report_reindex_started(on_progress, total_posts, "posts") + + posts + |> Enum.with_index(1) + |> Enum.each(fn {post, index} -> + insert_post_index(post) + :ok = report_reindex_progress(on_progress, index, total_posts, "posts") + end) :ok end - def reindex_media(project_id) do + def reindex_media(project_id, opts \\ []) do Repo.query!( "DELETE FROM media_fts WHERE media_id IN (SELECT id FROM media WHERE project_id = ?)", [project_id] ) - Repo.all(from media in Media, where: media.project_id == ^project_id) - |> Enum.each(&insert_media_index/1) + media_items = Repo.all(from media in Media, where: media.project_id == ^project_id) + on_progress = progress_callback(opts) + total_media = length(media_items) + + :ok = report_reindex_started(on_progress, total_media, "media items") + + media_items + |> Enum.with_index(1) + |> Enum.each(fn {media, index} -> + insert_media_index(media) + :ok = report_reindex_progress(on_progress, index, total_media, "media items") + end) :ok end @@ -192,6 +212,33 @@ defmodule BDS.Search do :ok end + defp progress_callback(opts) do + case Keyword.get(opts, :on_progress) do + callback when is_function(callback, 2) -> callback + _other -> nil + end + end + + defp report_reindex_started(nil, _total, _label), do: :ok + + defp report_reindex_started(callback, 0, label) do + callback.(1.0, "No #{label} to reindex") + :ok + end + + defp report_reindex_started(callback, total, label) do + callback.(0.0, "Reindexing 0/#{total} #{label}") + :ok + end + + defp report_reindex_progress(nil, _current, _total, _label), do: :ok + defp report_reindex_progress(_callback, _current, 0, _label), do: :ok + + defp report_reindex_progress(callback, current, total, label) do + callback.(current / total, "Reindexing #{current}/#{total} #{label}") + :ok + end + defp insert_post_index(%Post{} = post) do {title, excerpt, content, tags, categories} = post_index_fields(post) diff --git a/test/bds/desktop/shell_commands_test.exs b/test/bds/desktop/shell_commands_test.exs index 3c07c2e..2fef977 100644 --- a/test/bds/desktop/shell_commands_test.exs +++ b/test/bds/desktop/shell_commands_test.exs @@ -3,6 +3,21 @@ defmodule BDS.Desktop.ShellCommandsTest do alias BDS.Desktop.ShellCommands + defmodule SlowEmbeddingBackend do + @behaviour BDS.Embeddings.Backend + + @impl true + def model_info do + BDS.Embeddings.Backends.InApp.model_info() + end + + @impl true + def embed(text, opts) do + Process.sleep(10) + BDS.Embeddings.Backends.InApp.embed(text, opts) + end + end + setup do :ok = Ecto.Adapters.SQL.Sandbox.checkout(BDS.Repo) Ecto.Adapters.SQL.Sandbox.mode(BDS.Repo, {:shared, self()}) @@ -177,6 +192,64 @@ defmodule BDS.Desktop.ShellCommandsTest do assert is_map(completed.result.payload.summary) end + test "rebuild_embedding_index exposes live in-task progress while rebuilding posts", %{project: project} do + original = Application.get_env(:bds, :tasks, []) + original_embeddings = Application.get_env(:bds, :embeddings) + + Application.put_env( + :bds, + :tasks, + original + |> Keyword.put(:max_concurrent, 1) + |> Keyword.put(:progress_throttle_ms, 0) + ) + + Application.put_env(:bds, :embeddings, backend: SlowEmbeddingBackend) + + on_exit(fn -> + Application.put_env(:bds, :tasks, original) + + if is_nil(original_embeddings) do + Application.delete_env(:bds, :embeddings) + else + Application.put_env(:bds, :embeddings, original_embeddings) + end + end) + + assert {:ok, _metadata} = + BDS.Metadata.update_project_metadata(project.id, %{semantic_similarity_enabled: true}) + + Enum.each(1..40, fn index -> + assert {:ok, post} = + BDS.Posts.create_post(%{ + project_id: project.id, + title: "Embedding Progress #{index}", + content: "space rocket orbit mission galaxy #{index}", + language: "en" + }) + + assert {:ok, _published_post} = BDS.Posts.publish_post(post.id) + end) + + BDS.Repo.delete_all(BDS.Embeddings.Key) + + assert {:ok, result} = ShellCommands.execute("rebuild_embedding_index") + + progressed = + wait_for_task( + result.task_id, + &(&1.status == :running and is_number(&1.progress) and &1.progress > 0.0 and &1.progress < 1.0 and + is_binary(&1.message) and String.contains?(&1.message, "/")), + 10_000 + ) + + assert progressed.group_name == "Embeddings" + assert String.contains?(progressed.message, "/") + + assert wait_for_task(result.task_id, &(&1.status == :completed and &1.progress == 1.0), 10_000).status == + :completed + end + test "rebuild_database fans out tracked maintenance tasks for the active project" do assert {:ok, result} = ShellCommands.execute("rebuild_database")